Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cascade terminate/Purge Workflow Support #7340

Merged
merged 43 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
751722f
Updating protos
shivamkm07 Jan 2, 2024
ea2dedc
enable recursive terminate/purge
shivamkm07 Jan 2, 2024
5175161
Updating dtf-go
shivamkm07 Jan 2, 2024
24f3e7d
updating contrib
shivamkm07 Jan 2, 2024
abe6005
Correct proto generated files
shivamkm07 Jan 2, 2024
41e16a4
update comment
shivamkm07 Jan 2, 2024
bb0be0e
Merge branch 'master' into cascade_terminatePurgeWf
mukundansundar Jan 3, 2024
2d55758
implementing getOrchestrationRuntimeState
shivamkm07 Jan 3, 2024
89e0084
Adding unit tests
shivamkm07 Jan 3, 2024
56273a1
Merge branch 'master' into cascade_terminatePurgeWf
shivamkm07 Jan 4, 2024
04ca172
Merge branch 'master' into cascade_terminatePurgeWf
daixiang0 Jan 8, 2024
1d15ad8
merge master
shivamkm07 Jan 9, 2024
1f76406
Merge branch 'master' into cascade_terminatePurgeWf
mukundansundar Jan 10, 2024
b5ceb30
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 10, 2024
65bebca
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 11, 2024
eeaed71
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 11, 2024
d9ff995
Merge branch 'master' into cascade_terminatePurgeWf
mukundansundar Jan 11, 2024
7e4c476
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 11, 2024
fb499a6
make modtidy-all
shivamkm07 Jan 12, 2024
d7e7ffb
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 12, 2024
4ac7bcf
Adding recursive option in query parameter
shivamkm07 Jan 12, 2024
f02762d
Adding integration test for workflow
shivamkm07 Jan 12, 2024
7a53708
fix bug in Creating sub-orchestrations
shivamkm07 Jan 12, 2024
b20da16
Merge remote-tracking branch 'upstream/master' into cascade_terminate…
shivamkm07 Jan 12, 2024
9c4312d
Setting recursive/terminate purge to default and adding tests for same
shivamkm07 Jan 12, 2024
f61361b
linter fixes
shivamkm07 Jan 12, 2024
bfa7fed
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 12, 2024
7a573e8
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 12, 2024
9e8ba8d
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 12, 2024
e3938f8
merge master
shivamkm07 Jan 15, 2024
507c3f6
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 15, 2024
56dca91
Removing fix from this PR
shivamkm07 Jan 15, 2024
49db0dd
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 15, 2024
1a87247
Merge branch 'master' into cascade_terminatePurgeWf
mukundansundar Jan 16, 2024
b456cb0
merge master
shivamkm07 Jan 16, 2024
614fc7c
renaming to
shivamkm07 Jan 16, 2024
ba65bde
linter fixes
shivamkm07 Jan 16, 2024
b5f8c4b
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 16, 2024
86f9b63
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 16, 2024
3dce3f5
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 16, 2024
1d033a0
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 16, 2024
5150403
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 16, 2024
87ef171
Merge branch 'master' into cascade_terminatePurgeWf
dapr-bot Jan 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,8 @@ test-integration: test-deps
################################################################################
# Target: lint #
################################################################################
# Please use golangci-lint version v1.51.2 , otherwise you might encounter errors.
# You can download version v1.51.2 at https://github.com/golangci/golangci-lint/releases/tag/v1.51.2
# Please use golangci-lint version v1.55.2 , otherwise you might encounter errors.
# You can download version v1.55.2 at https://github.com/golangci/golangci-lint/releases/tag/v1.55.2
.PHONY: lint
lint: check-linter
$(GOLANGCI_LINT) run --build-tags=allcomponents --timeout=20m
Expand Down
4 changes: 4 additions & 0 deletions dapr/proto/runtime/v1/dapr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,8 @@ message TerminateWorkflowRequest {
string instance_id = 1 [json_name = "instanceID"];
// Name of the workflow component.
string workflow_component = 2 [json_name = "workflowComponent"];
// Indicates whether this is a non_recursive terminate request
bool non_recursive = 3 [json_name = "non_recursive"];
}

// PauseWorkflowRequest is the request for PauseWorkflowBeta1.
Expand Down Expand Up @@ -1102,6 +1104,8 @@ message PurgeWorkflowRequest {
string instance_id = 1 [json_name = "instanceID"];
// Name of the workflow component.
string workflow_component = 2 [json_name = "workflowComponent"];
// Indicates whether this is a non_recursive purge request
bool non_recursive = 3 [json_name = "non_recursive"];
}

// ShutdownRequest is the request for Shutdown.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/jhump/protoreflect v1.15.2
github.com/kelseyhightower/envconfig v1.4.0
github.com/lestrrat-go/jwx/v2 v2.0.18
github.com/microsoft/durabletask-go v0.4.0
github.com/microsoft/durabletask-go v0.4.1-0.20231222205218-fc5756737d3c
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
github.com/prometheus/client_golang v1.16.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1160,8 +1160,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zk
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/microcosm-cc/bluemonday v1.0.24 h1:NGQoPtwGVcbGkKfvyYk1yRqknzBuoMiUrO6R7uFTPlw=
github.com/microcosm-cc/bluemonday v1.0.24/go.mod h1:ArQySAMps0790cHSkdPEJ7bGkF2VePWH773hsJNSHf8=
github.com/microsoft/durabletask-go v0.4.0 h1:rGqKRZYyvxBaD/UIfVUnlGqrycqBg30Ngpt0ODcIzqY=
github.com/microsoft/durabletask-go v0.4.0/go.mod h1:svScWPnRqjf9YgxeCB3CkYLMAyvuu+qqNf4Hl9dmvcg=
github.com/microsoft/durabletask-go v0.4.1-0.20231222205218-fc5756737d3c h1:eng0MBn0jE65oWhzHbCugaGs6JetgfkdVW36nIJ0vgI=
github.com/microsoft/durabletask-go v0.4.1-0.20231222205218-fc5756737d3c/go.mod h1:OSZ4K7SgqBEsaouk3lAVdDzvanIzsdj7angZ0FTeSAU=
github.com/microsoft/go-mssqldb v1.6.0 h1:mM3gYdVwEPFrlg/Dvr2DNVEgYFG7L42l+dGc67NNNpc=
github.com/microsoft/go-mssqldb v1.6.0/go.mod h1:00mDtPbeQCRGC1HwOOR5K/gr30P1NcEG0vx6Kbv2aJU=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
Expand Down
1 change: 1 addition & 0 deletions pkg/api/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ const (
workflowComponent = "workflowComponent"
workflowName = "workflowName"
instanceID = "instanceID"
nonRecursive = "non_recursive"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this recursive instead of non_recursive? It's harder for people to understand negatives.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only concern I have here is, in general query string default parameters when not specified are the default values like false, 0 etc.
By defaulting to recursive terminate/purge and then asking users to set ?recursive=false does not seem intuitive to me, rather at that point setting ?non_recursive=true make it more intuitive IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of incline towards recursive set to true by default. Understanding this was quite easier for me.
Is there any reasoning behind keeping all flags to false by default, apart from general practice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally agree with Mukundan that it's more intuitive when the default value of a boolean is false.

If the default value is "true", then you are introducing 3 states: set true, set false, null.

eventName = "eventName"
consistencyParam = "consistency"
concurrencyParam = "concurrency"
Expand Down
46 changes: 46 additions & 0 deletions pkg/api/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2821,6 +2821,30 @@ func TestV1Beta1Workflow(t *testing.T) {
assert.Nil(t, resp.ErrorBody)
})

t.Run("Terminate with non_recursive set to false", func(t *testing.T) {
// This has the same behavior as the case when non_recursive parameter is not set.
// This is because default case is set to recursive termination.

apiPath := "v1.0-beta1/workflows/dapr/instanceID/terminate?non_recursive=false"

resp := fakeServer.DoRequest("POST", apiPath, nil, nil)
assert.Equal(t, 202, resp.StatusCode)

// assert
assert.Nil(t, resp.ErrorBody)
})

t.Run("Terminate with non_recursive true", func(t *testing.T) {
// Note that in case of non_recursive true, MockWorkflow intentionally returns fake error, even when it is not an actual error.
// This is to test that non_recursive flag is being passed correctly to the workflow component.

apiPath := "v1.0-beta1/workflows/dapr/instanceID/terminate?non_recursive=true"
resp := fakeServer.DoRequest("POST", apiPath, nil, nil)
assert.Equal(t, 500, resp.StatusCode)
assert.Equal(t, "ERR_TERMINATE_WORKFLOW", resp.ErrorBody["errorCode"])
assert.Equal(t, fmt.Sprintf(messages.ErrTerminateWorkflow.Message(), "instanceID", daprt.ErrFakeWorkflowNonRecursiveTerminateError), resp.ErrorBody["message"])
})

///////////////////////////
// RAISE EVENT API TESTS //
///////////////////////////
Expand Down Expand Up @@ -2931,6 +2955,28 @@ func TestV1Beta1Workflow(t *testing.T) {
// assert
assert.Nil(t, resp.ErrorBody)
})

t.Run("Purge with non_recursive false", func(t *testing.T) {
// This has the same behavior as the case when non_recursive parameter is not set.
// This is because default case is set to recursive purge.

apiPath := "v1.0-beta1/workflows/dapr/instanceID/purge?non_recursive=false"
resp := fakeServer.DoRequest("POST", apiPath, nil, nil)
assert.Equal(t, 202, resp.StatusCode)

// assert
assert.Nil(t, resp.ErrorBody)
})
t.Run("Purge with non_recursive true", func(t *testing.T) {
// Note that in case of non_recursive true, MockWorkflow intentionally returns fake error, even when it is not an actual error.
// This is to test that non_recursive flag is being passed correctly to the workflow component.

apiPath := "v1.0-beta1/workflows/dapr/instanceID/purge?non_recursive=true"
resp := fakeServer.DoRequest("POST", apiPath, nil, nil)
assert.Equal(t, 500, resp.StatusCode)
assert.Equal(t, "ERR_PURGE_WORKFLOW", resp.ErrorBody["errorCode"])
assert.Equal(t, fmt.Sprintf(messages.ErrPurgeWorkflow.Message(), "instanceID", daprt.ErrFakeWorkflowNonRecurisvePurgeError), resp.ErrorBody["message"])
})
}

func buildHTTPPineline(spec config.PipelineSpec) httpMiddleware.Pipeline {
Expand Down
33 changes: 31 additions & 2 deletions pkg/api/http/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import (
"io"
"net/http"
"strconv"

"github.com/go-chi/chi/v5"
"github.com/google/uuid"
Expand Down Expand Up @@ -238,7 +239,21 @@
return UniversalHTTPHandler(
a.universal.TerminateWorkflowBeta1,
UniversalHTTPHandlerOpts[*runtimev1pb.TerminateWorkflowRequest, *emptypb.Empty]{
InModifier: workflowInModifier[*runtimev1pb.TerminateWorkflowRequest],
InModifier: func(r *http.Request, in *runtimev1pb.TerminateWorkflowRequest) (*runtimev1pb.TerminateWorkflowRequest, error) {
in.SetWorkflowComponent(chi.URLParam(r, workflowComponent))
in.SetInstanceId(chi.URLParam(r, instanceID))

// Extract non_recursive option from query string
nonRecursive := r.URL.Query().Get(nonRecursive)
if nonRecursive != "" {
var err error
in.NonRecursive, err = strconv.ParseBool(nonRecursive)
if err != nil {
return nil, err

Check warning on line 252 in pkg/api/http/workflow.go

View check run for this annotation

Codecov / codecov/patch

pkg/api/http/workflow.go#L252

Added line #L252 was not covered by tests
}
}
return in, nil
},
SuccessStatusCode: http.StatusAccepted,
})
}
Expand Down Expand Up @@ -292,7 +307,21 @@
return UniversalHTTPHandler(
a.universal.PurgeWorkflowBeta1,
UniversalHTTPHandlerOpts[*runtimev1pb.PurgeWorkflowRequest, *emptypb.Empty]{
InModifier: workflowInModifier[*runtimev1pb.PurgeWorkflowRequest],
InModifier: func(r *http.Request, in *runtimev1pb.PurgeWorkflowRequest) (*runtimev1pb.PurgeWorkflowRequest, error) {
in.SetWorkflowComponent(chi.URLParam(r, workflowComponent))
in.SetInstanceId(chi.URLParam(r, instanceID))

// Extract non_recursive option from query string
nonRecursive := r.URL.Query().Get(nonRecursive)
if nonRecursive != "" {
var err error
in.NonRecursive, err = strconv.ParseBool(nonRecursive)
if err != nil {
return nil, err

Check warning on line 320 in pkg/api/http/workflow.go

View check run for this annotation

Codecov / codecov/patch

pkg/api/http/workflow.go#L320

Added line #L320 was not covered by tests
}
}
return in, nil
},
SuccessStatusCode: http.StatusAccepted,
})
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/universal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@

req := &workflows.TerminateRequest{
InstanceID: in.GetInstanceId(),
Recursive: !in.GetNonRecursive(),
}
if err := workflowComponent.Terminate(ctx, req); err != nil {
if errors.Is(err, api.ErrInstanceNotFound) {
Expand Down Expand Up @@ -247,6 +248,7 @@

req := workflows.PurgeRequest{
InstanceID: in.GetInstanceId(),
Recursive: !in.GetNonRecursive(),

Check warning on line 251 in pkg/api/universal/workflow.go

View check run for this annotation

Codecov / codecov/patch

pkg/api/universal/workflow.go#L251

Added line #L251 was not covered by tests
}

err = workflowComponent.Purge(ctx, &req)
Expand Down