Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support orchestration id reuse policy #7308

Merged
merged 12 commits into from Dec 20, 2023
Merged

Conversation

kaibocai
Copy link
Contributor

@kaibocai kaibocai commented Dec 15, 2023

Description

Issue reference

This PR tries to update the logic to support reuse orchestration ID, more details can be found microsoft/durabletask-go#42, #7101
Corresponding protobuf updates can be found microsoft/durabletask-protobuf#19

  1. This PR requires Orchestration ID reuse policies microsoft/durabletask-go#51 in durabletask-go
  2. This PR upgrade the OTEL dependency to be compatible with durabletask-go as a newer version is used in durabletask-go
  3. This PR update the purge instance logic a bit so that only instance in [Completed, Failed, Terminated] will be purged.

Please reference the issue this PR will close: #7101

Checklist

Please make sure you've completed the relevant tasks for this PR, out of the following list:

@kaibocai kaibocai requested review from a team as code owners December 15, 2023 17:41
Signed-off-by: kaibocai <kaibocai@microsoft.com>
Signed-off-by: kaibocai <kaibocai@microsoft.com>
@kaibocai kaibocai changed the title support orchestration id reuse policy Support orchestration id reuse policy Dec 15, 2023
Signed-off-by: kaibocai <kaibocai@microsoft.com>
go.mod Outdated Show resolved Hide resolved
pkg/runtime/wfengine/wfengine_test.go Outdated Show resolved Hide resolved
pkg/runtime/wfengine/wfengine_test.go Outdated Show resolved Hide resolved
pkg/runtime/wfengine/wfengine_test.go Outdated Show resolved Hide resolved
pkg/runtime/wfengine/workflow.go Outdated Show resolved Hide resolved
@kaibocai
Copy link
Contributor Author

tag @cgillum , @ItalyPaleAle , @RyanLettieri for review.

Signed-off-by: kaibocai <kaibocai@microsoft.com>
for _, opt := range GetTestOptions() {
suffix := opt(engine)
t.Run(opt(engine), func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be using suffix here?

Suggested change
t.Run(opt(engine), func(t *testing.T) {
t.Run(suffix, func(t *testing.T) {

@@ -62,6 +62,11 @@ type recoverableError struct {
cause error
}

type PolicyAndEventData struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's give this a more specific name, like CreateWorkflowInstanceRequest

@@ -62,6 +62,11 @@ type recoverableError struct {
cause error
}

type PolicyAndEventData struct {
Policy *api.OrchestrationIdReusePolicy `json:"policy"`
EventData []byte `json:"eventData"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with existing code, I suggest StartEventData.

Suggested change
EventData []byte `json:"eventData"`
StartEventBytes []byte `json:"startEventBytes"`

// orchestration already exists, apply reuse id policy
runtimeState := getRuntimeState(actorID, state)
runtimeStatus := runtimeState.RuntimeStatus()
targetStatusValues := backend.BuildStatusSet(reuseIDPolicy.GetOperationStatus())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes back to my comment on the durabletask-go repo, but I don't think we need to expose a BuildStatusSet utility function for this simple purpose. I think a simple for-loop over reuseIDPolicy.GetOperationStatus() and comparing to runtimeStatus on each loop iteration is good enough since we only have a small number of possible status values. This might even be more performant than creating a map and doing hashing operations for the key lookup.


func (wf *workflowActor) createInstance(ctx context.Context, actorID string, startEvent *backend.HistoryEvent, state *workflowState) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused about the function name here. It's called createInstance but the only thing it does is create a reminder? Can we give this a more specific name, like scheduleWorkflowStart?

@@ -209,6 +255,36 @@ func (wf *workflowActor) createWorkflowInstance(ctx context.Context, actorID str
return wf.saveInternalState(ctx, actorID, state)
}

// This method cleans up a workflow associated with the given actorID
func (wf *workflowActor) cleanupOrchestrationStateInternal(ctx context.Context, actorID string, state *workflowState, requireCompleted bool) error {
runtimeState := getRuntimeState(actorID, state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've already loaded the runtime state in createWorkflowInstance. Should we reuse that and pass it as a parameter rather than getting the runtime state again?

func (wf *workflowActor) cleanupOrchestrationStateInternal(ctx context.Context, actorID string, state *workflowState, requireCompleted bool) error {
runtimeState := getRuntimeState(actorID, state)
runtimeStatus := runtimeState.RuntimeStatus()
isCompleted := runtimeStatus == api.COMPLETED || runtimeStatus == api.TERMINATED || runtimeStatus == api.FAILED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires a change in durabletask-go, but I wonder if we should prefix these constants with RUNTIME_STATUS_ so that it's clear what these values are for.

func (wf *workflowActor) cleanupOrchestrationStateInternal(ctx context.Context, actorID string, state *workflowState, requireCompleted bool) error {
runtimeState := getRuntimeState(actorID, state)
runtimeStatus := runtimeState.RuntimeStatus()
isCompleted := runtimeStatus == api.COMPLETED || runtimeStatus == api.TERMINATED || runtimeStatus == api.FAILED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just use the existing runtimeStatus.IsCompleted() for this instead of checking specific status values? That would be more maintainable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I am not using runtimeStatus.IsCompleted() is that I am not quite sure if it equals [COMPLETED , TERMINATED , FAILED]. From the code, the completedEvent is assigned when the orchestration completed at https://github.com/microsoft/durabletask-go/blob/aa335e20a192395649eb9ede97b57c9f647238ea/backend/runtimestate.go#L134, but when orchestration is terminated we don't assign a completedEvent, instead we create a ExecutionTerminatedEvent at https://github.com/microsoft/durabletask-go/blob/aa335e20a192395649eb9ede97b57c9f647238ea/backend/runtimestate.go#L197, so it seems runtimeStatus.IsCompleted() only check if orchestration is completed but not check if it's terminated/failed.

Copy link
Contributor

@cgillum cgillum Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to this integration test code, a terminated orchestration results in runtimeStatus.IsCompleted() being true.

The second code block you're referencing is specifically for handling cascading terminate. It's not directly related to how we represent a currently terminated orchestration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, hang on - I need to double check. The code I pointed to is for orchestration metadata, not the orchestration runtime state object...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so confirming this wasn't as straightforward as I thought. But the short answer is that a terminated orchestration should always be completed. When an orchestration processes an ExecutionTerminated event, the SDK will generate a completedEvent with a status of "Terminated". This can be seen in the SDK code here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Updated!

Signed-off-by: kaibocai <kaibocai@microsoft.com>
Copy link

codecov bot commented Dec 15, 2023

Codecov Report

Attention: 17 lines in your changes are missing coverage. Please review.

Comparison is base (ef925c9) 64.59% compared to head (0205b9c) 64.60%.
Report is 1 commits behind head on master.

❗ Current head 0205b9c differs from pull request most recent head 33f1fd7. Consider uploading reports for the commit 33f1fd7 to get more accurate results

Files Patch % Lines
pkg/runtime/wfengine/workflow.go 65.90% 8 Missing and 7 partials ⚠️
pkg/runtime/wfengine/backend.go 80.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #7308      +/-   ##
==========================================
+ Coverage   64.59%   64.60%   +0.01%     
==========================================
  Files         226      226              
  Lines       21191    21223      +32     
==========================================
+ Hits        13689    13712      +23     
- Misses       6324     6329       +5     
- Partials     1178     1182       +4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: kaibocai <kaibocai@microsoft.com>

simplify complete check

Signed-off-by: kaibocai <kaibocai@microsoft.com>
Copy link
Contributor

@cgillum cgillum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some minor things.

pkg/runtime/wfengine/backend.go Outdated Show resolved Hide resolved
}
wf.states.Delete(actorID)
return nil
return wf.cleanupOrchestrationStateInternal(ctx, actorID, state, true && !runtimeState.IsCompleted())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return wf.cleanupOrchestrationStateInternal(ctx, actorID, state, true && !runtimeState.IsCompleted())
return wf.cleanupOrchestrationStateInternal(ctx, actorID, state, !runtimeState.IsCompleted())

switch reuseIDPolicy.GetAction() {
case api.REUSE_ID_ACTION_IGNORE:
// Log an warning message and ignore creating new instance
wfLogger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", actorID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, we should follow the logging format that's used elsewhere in this file. For example:

Suggested change
wfLogger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", actorID)
wfLogger.Warnf("Workflow actor '%s': ignoring request to recreate the current workflow instance", actorID)

@@ -209,6 +263,33 @@ func (wf *workflowActor) createWorkflowInstance(ctx context.Context, actorID str
return wf.saveInternalState(ctx, actorID, state)
}

// This method cleans up a workflow associated with the given actorID
func (wf *workflowActor) cleanupOrchestrationStateInternal(ctx context.Context, actorID string, state *workflowState, requiredAndNotCompleted bool) error {
// Only purge orchestration in the ['COMPLETED', 'FAILED', 'TERMINATED'] statuses,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment should probably be updated.

Signed-off-by: kaibocai <kaibocai@microsoft.com>

update method name

Signed-off-by: kaibocai <kaibocai@microsoft.com>
Copy link
Contributor

@cgillum cgillum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I'll go ahead and merge the durabletask-go changes and create a release so that this PR can be unblocked.

@kaibocai
Copy link
Contributor Author

LGTM! I'll go ahead and merge the durabletask-go changes and create a release so that this PR can be unblocked.

sorry that I didn't see this one and created the release myself.

@kaibocai
Copy link
Contributor Author

Hi @cgillum, @ItalyPaleAle and @RyanLettieri, the duraletask-go is updated to version v0.4.0 and I think this PR should be ready for final review and merge. Please help to take another look. Thanks!

Signed-off-by: kaibocai <kaibocai@microsoft.com>
@kaibocai kaibocai force-pushed the kaibocai/reuse-id branch 2 times, most recently from 914f7ee to 0280640 Compare December 18, 2023 22:18
Signed-off-by: kaibocai <kaibocai@microsoft.com>

update test time compare

Signed-off-by: kaibocai <kaibocai@microsoft.com>

fix assert format

Signed-off-by: kaibocai <kaibocai@microsoft.com>
@kaibocai
Copy link
Contributor Author

tag @mukundansundar for review as well.

Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
return wf.createIfCompleted(ctx, runtimeState, actorID, state, startEvent)
}

switch reuseIDPolicy.GetAction() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR LGTM, I just wanted to understand how are we skipping the creation of a new orchestration here, (i.e. the option SkipIfExists from microsoft/durabletask-go#42 (comment))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case api.REUSE_ID_ACTION_IGNORE:
		// Log an warning message and ignore creating new instance
		wfLogger.Warnf("Workflow actor '%s': ignoring request to recreate the current workflow instance", actorID)
		return nil

So if users choose REUSE_ID_ACTION_IGNORE and if there is an existing instance with the same instance ID, then we will enter the above block and just log a message and directly return.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, I missed that Ignore is Skip, thanks for clarification

@cgillum
Copy link
Contributor

cgillum commented Dec 19, 2023

Hi @artursouza, @yaron2, @mukundansundar. This workflow PR has been signed off and is ready to be merged.

@yaron2 yaron2 merged commit b118deb into dapr:master Dec 20, 2023
16 of 20 checks passed
@ItalyPaleAle ItalyPaleAle added this to the v1.13 milestone Jan 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Workflow] Support for workflow ID reuse policies
5 participants