Skip to content

Commit

Permalink
Cascade terminate/Purge Workflow Support (#7340)
Browse files Browse the repository at this point in the history
* Updating protos

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* enable recursive terminate/purge

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Updating dtf-go

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* updating contrib

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Correct proto generated files

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* update comment

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* implementing getOrchestrationRuntimeState

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Adding unit tests

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* make modtidy-all

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Adding recursive option in query parameter

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Adding integration test for workflow

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* fix bug in Creating sub-orchestrations

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Setting recursive/terminate purge to default and adding tests for same

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* linter fixes

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* Removing fix from this PR

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* renaming  to

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* linter fixes

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

---------

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>
Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Co-authored-by: Loong Dai <long.dai@intel.com>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
  • Loading branch information
4 people committed Jan 17, 2024
1 parent 994b037 commit f9a2088
Show file tree
Hide file tree
Showing 13 changed files with 686 additions and 373 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,8 @@ test-integration: test-deps
################################################################################
# Target: lint #
################################################################################
# Please use golangci-lint version v1.51.2 , otherwise you might encounter errors.
# You can download version v1.51.2 at https://github.com/golangci/golangci-lint/releases/tag/v1.51.2
# Please use golangci-lint version v1.55.2 , otherwise you might encounter errors.
# You can download version v1.55.2 at https://github.com/golangci/golangci-lint/releases/tag/v1.55.2
.PHONY: lint
lint: check-linter
$(GOLANGCI_LINT) run --build-tags=$(GOLANGCI_LINT_TAGS) --timeout=20m
Expand Down
4 changes: 4 additions & 0 deletions dapr/proto/runtime/v1/dapr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,8 @@ message TerminateWorkflowRequest {
string instance_id = 1 [json_name = "instanceID"];
// Name of the workflow component.
string workflow_component = 2 [json_name = "workflowComponent"];
// Indicates whether this is a non_recursive terminate request
bool non_recursive = 3 [json_name = "non_recursive"];
}

// PauseWorkflowRequest is the request for PauseWorkflowBeta1.
Expand Down Expand Up @@ -1102,6 +1104,8 @@ message PurgeWorkflowRequest {
string instance_id = 1 [json_name = "instanceID"];
// Name of the workflow component.
string workflow_component = 2 [json_name = "workflowComponent"];
// Indicates whether this is a non_recursive purge request
bool non_recursive = 3 [json_name = "non_recursive"];
}

// ShutdownRequest is the request for Shutdown.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/jhump/protoreflect v1.15.2
github.com/kelseyhightower/envconfig v1.4.0
github.com/lestrrat-go/jwx/v2 v2.0.18
github.com/microsoft/durabletask-go v0.4.0
github.com/microsoft/durabletask-go v0.4.1-0.20231222205218-fc5756737d3c
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
github.com/prometheus/client_golang v1.16.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1160,8 +1160,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zk
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/microcosm-cc/bluemonday v1.0.24 h1:NGQoPtwGVcbGkKfvyYk1yRqknzBuoMiUrO6R7uFTPlw=
github.com/microcosm-cc/bluemonday v1.0.24/go.mod h1:ArQySAMps0790cHSkdPEJ7bGkF2VePWH773hsJNSHf8=
github.com/microsoft/durabletask-go v0.4.0 h1:rGqKRZYyvxBaD/UIfVUnlGqrycqBg30Ngpt0ODcIzqY=
github.com/microsoft/durabletask-go v0.4.0/go.mod h1:svScWPnRqjf9YgxeCB3CkYLMAyvuu+qqNf4Hl9dmvcg=
github.com/microsoft/durabletask-go v0.4.1-0.20231222205218-fc5756737d3c h1:eng0MBn0jE65oWhzHbCugaGs6JetgfkdVW36nIJ0vgI=
github.com/microsoft/durabletask-go v0.4.1-0.20231222205218-fc5756737d3c/go.mod h1:OSZ4K7SgqBEsaouk3lAVdDzvanIzsdj7angZ0FTeSAU=
github.com/microsoft/go-mssqldb v1.6.0 h1:mM3gYdVwEPFrlg/Dvr2DNVEgYFG7L42l+dGc67NNNpc=
github.com/microsoft/go-mssqldb v1.6.0/go.mod h1:00mDtPbeQCRGC1HwOOR5K/gr30P1NcEG0vx6Kbv2aJU=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
Expand Down
1 change: 1 addition & 0 deletions pkg/api/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ const (
workflowComponent = "workflowComponent"
workflowName = "workflowName"
instanceID = "instanceID"
nonRecursive = "non_recursive"
eventName = "eventName"
consistencyParam = "consistency"
concurrencyParam = "concurrency"
Expand Down
46 changes: 46 additions & 0 deletions pkg/api/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2821,6 +2821,30 @@ func TestV1Beta1Workflow(t *testing.T) {
assert.Nil(t, resp.ErrorBody)
})

t.Run("Terminate with non_recursive set to false", func(t *testing.T) {
// This has the same behavior as the case when non_recursive parameter is not set.
// This is because default case is set to recursive termination.

apiPath := "v1.0-beta1/workflows/dapr/instanceID/terminate?non_recursive=false"

resp := fakeServer.DoRequest("POST", apiPath, nil, nil)
assert.Equal(t, 202, resp.StatusCode)

// assert
assert.Nil(t, resp.ErrorBody)
})

t.Run("Terminate with non_recursive true", func(t *testing.T) {
// Note that in case of non_recursive true, MockWorkflow intentionally returns fake error, even when it is not an actual error.
// This is to test that non_recursive flag is being passed correctly to the workflow component.

apiPath := "v1.0-beta1/workflows/dapr/instanceID/terminate?non_recursive=true"
resp := fakeServer.DoRequest("POST", apiPath, nil, nil)
assert.Equal(t, 500, resp.StatusCode)
assert.Equal(t, "ERR_TERMINATE_WORKFLOW", resp.ErrorBody["errorCode"])
assert.Equal(t, fmt.Sprintf(messages.ErrTerminateWorkflow.Message(), "instanceID", daprt.ErrFakeWorkflowNonRecursiveTerminateError), resp.ErrorBody["message"])
})

///////////////////////////
// RAISE EVENT API TESTS //
///////////////////////////
Expand Down Expand Up @@ -2931,6 +2955,28 @@ func TestV1Beta1Workflow(t *testing.T) {
// assert
assert.Nil(t, resp.ErrorBody)
})

t.Run("Purge with non_recursive false", func(t *testing.T) {
// This has the same behavior as the case when non_recursive parameter is not set.
// This is because default case is set to recursive purge.

apiPath := "v1.0-beta1/workflows/dapr/instanceID/purge?non_recursive=false"
resp := fakeServer.DoRequest("POST", apiPath, nil, nil)
assert.Equal(t, 202, resp.StatusCode)

// assert
assert.Nil(t, resp.ErrorBody)
})
t.Run("Purge with non_recursive true", func(t *testing.T) {
// Note that in case of non_recursive true, MockWorkflow intentionally returns fake error, even when it is not an actual error.
// This is to test that non_recursive flag is being passed correctly to the workflow component.

apiPath := "v1.0-beta1/workflows/dapr/instanceID/purge?non_recursive=true"
resp := fakeServer.DoRequest("POST", apiPath, nil, nil)
assert.Equal(t, 500, resp.StatusCode)
assert.Equal(t, "ERR_PURGE_WORKFLOW", resp.ErrorBody["errorCode"])
assert.Equal(t, fmt.Sprintf(messages.ErrPurgeWorkflow.Message(), "instanceID", daprt.ErrFakeWorkflowNonRecurisvePurgeError), resp.ErrorBody["message"])
})
}

func buildHTTPPineline(spec config.PipelineSpec) httpMiddleware.Pipeline {
Expand Down
33 changes: 31 additions & 2 deletions pkg/api/http/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package http
import (
"io"
"net/http"
"strconv"

"github.com/go-chi/chi/v5"
"github.com/google/uuid"
Expand Down Expand Up @@ -238,7 +239,21 @@ func (a *api) onTerminateWorkflowHandler() http.HandlerFunc {
return UniversalHTTPHandler(
a.universal.TerminateWorkflowBeta1,
UniversalHTTPHandlerOpts[*runtimev1pb.TerminateWorkflowRequest, *emptypb.Empty]{
InModifier: workflowInModifier[*runtimev1pb.TerminateWorkflowRequest],
InModifier: func(r *http.Request, in *runtimev1pb.TerminateWorkflowRequest) (*runtimev1pb.TerminateWorkflowRequest, error) {
in.SetWorkflowComponent(chi.URLParam(r, workflowComponent))
in.SetInstanceId(chi.URLParam(r, instanceID))

// Extract non_recursive option from query string
nonRecursive := r.URL.Query().Get(nonRecursive)
if nonRecursive != "" {
var err error
in.NonRecursive, err = strconv.ParseBool(nonRecursive)
if err != nil {
return nil, err
}
}
return in, nil
},
SuccessStatusCode: http.StatusAccepted,
})
}
Expand Down Expand Up @@ -292,7 +307,21 @@ func (a *api) onPurgeWorkflowHandler() http.HandlerFunc {
return UniversalHTTPHandler(
a.universal.PurgeWorkflowBeta1,
UniversalHTTPHandlerOpts[*runtimev1pb.PurgeWorkflowRequest, *emptypb.Empty]{
InModifier: workflowInModifier[*runtimev1pb.PurgeWorkflowRequest],
InModifier: func(r *http.Request, in *runtimev1pb.PurgeWorkflowRequest) (*runtimev1pb.PurgeWorkflowRequest, error) {
in.SetWorkflowComponent(chi.URLParam(r, workflowComponent))
in.SetInstanceId(chi.URLParam(r, instanceID))

// Extract non_recursive option from query string
nonRecursive := r.URL.Query().Get(nonRecursive)
if nonRecursive != "" {
var err error
in.NonRecursive, err = strconv.ParseBool(nonRecursive)
if err != nil {
return nil, err
}
}
return in, nil
},
SuccessStatusCode: http.StatusAccepted,
})
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/universal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (a *Universal) TerminateWorkflowBeta1(ctx context.Context, in *runtimev1pb.

req := &workflows.TerminateRequest{
InstanceID: in.GetInstanceId(),
Recursive: !in.GetNonRecursive(),
}
if err := workflowComponent.Terminate(ctx, req); err != nil {
if errors.Is(err, api.ErrInstanceNotFound) {
Expand Down Expand Up @@ -247,6 +248,7 @@ func (a *Universal) PurgeWorkflowBeta1(ctx context.Context, in *runtimev1pb.Purg

req := workflows.PurgeRequest{
InstanceID: in.GetInstanceId(),
Recursive: !in.GetNonRecursive(),
}

err = workflowComponent.Purge(ctx, &req)
Expand Down

0 comments on commit f9a2088

Please sign in to comment.