Skip to content

Commit

Permalink
minor updates
Browse files Browse the repository at this point in the history
  • Loading branch information
kaibocai committed Dec 15, 2023
1 parent 8e89c1f commit 0cecc57
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 41 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -456,4 +456,4 @@ replace github.com/microcosm-cc/bluemonday => github.com/microcosm-cc/bluemonday
// Then, run `make modtidy` in this repository.
// This ensures that go.mod and go.sum are up-to-date.

replace github.com/microsoft/durabletask-go v0.3.1 => github.com/microsoft/durabletask-go v0.3.2-0.20231215202319-6ccd188e08eb
replace github.com/microsoft/durabletask-go v0.3.1 => github.com/microsoft/durabletask-go v0.3.2-0.20231215215359-12222afe377a
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1128,8 +1128,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zk
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/microcosm-cc/bluemonday v1.0.24 h1:NGQoPtwGVcbGkKfvyYk1yRqknzBuoMiUrO6R7uFTPlw=
github.com/microcosm-cc/bluemonday v1.0.24/go.mod h1:ArQySAMps0790cHSkdPEJ7bGkF2VePWH773hsJNSHf8=
github.com/microsoft/durabletask-go v0.3.2-0.20231215202319-6ccd188e08eb h1:nIhpoQv9AQfMiR2QiTIB7c+cqVgcGLqN5cnyoOSBUcc=
github.com/microsoft/durabletask-go v0.3.2-0.20231215202319-6ccd188e08eb/go.mod h1:svScWPnRqjf9YgxeCB3CkYLMAyvuu+qqNf4Hl9dmvcg=
github.com/microsoft/durabletask-go v0.3.2-0.20231215215359-12222afe377a h1:QN7LXuSgB9vKs4bTUYfDpJULKMkw8Lld8lZJuMQIWao=
github.com/microsoft/durabletask-go v0.3.2-0.20231215215359-12222afe377a/go.mod h1:svScWPnRqjf9YgxeCB3CkYLMAyvuu+qqNf4Hl9dmvcg=
github.com/microsoft/go-mssqldb v1.6.0 h1:mM3gYdVwEPFrlg/Dvr2DNVEgYFG7L42l+dGc67NNNpc=
github.com/microsoft/go-mssqldb v1.6.0/go.mod h1:00mDtPbeQCRGC1HwOOR5K/gr30P1NcEG0vx6Kbv2aJU=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
Expand Down
13 changes: 8 additions & 5 deletions pkg/runtime/wfengine/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,22 @@ func (be *actorBackend) CreateOrchestrationInstance(ctx context.Context, e *back
return err
}

policyAndEventData := PolicyAndEventData{
Policy: policy,
EventData: eventData,
createWorkflowInstanceRequest := CreateWorkflowInstanceRequest{
Policy: policy,
StartEventBytes: eventData,
}

policyAndEventDataBytes, _ := json.Marshal(policyAndEventData)
requestBytes, err := json.Marshal(createWorkflowInstanceRequest)
if err != nil {
return fmt.Errorf("failed to marshal createWorkflowInstanceRequest: %w", err)

Check warning on line 161 in pkg/runtime/wfengine/backend.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/backend.go#L161

Added line #L161 was not covered by tests
}

// Invoke the well-known workflow actor directly, which will be created by this invocation
// request. Note that this request goes directly to the actor runtime, bypassing the API layer.
req := invokev1.
NewInvokeMethodRequest(CreateWorkflowInstanceMethod).
WithActor(be.config.workflowActorType, workflowInstanceID).
WithRawDataBytes(policyAndEventDataBytes).
WithRawDataBytes(requestBytes).
WithContentType(invokev1.OctetStreamContentType)
defer req.Close()

Expand Down
14 changes: 7 additions & 7 deletions pkg/runtime/wfengine/wfengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,11 @@ func TestSingleActivityWorkflow_ReuseInstanceIDIgnore(t *testing.T) {

for _, opt := range GetTestOptions() {
suffix := opt(engine)
t.Run(opt(engine), func(t *testing.T) {
t.Run(suffix, func(t *testing.T) {
instanceID := api.InstanceID("IGNORE_IF_RUNNING_OR_COMPLETED_" + suffix)
reuseIDPolicy := &api.OrchestrationIdReusePolicy{
Action: api.IGNORE,
OperationStatus: []api.OrchestrationStatus{api.RUNNING, api.COMPLETED, api.PENDING},
Action: api.REUSE_ID_ACTION_IGNORE,
OperationStatus: []api.OrchestrationStatus{api.RUNTIME_STATUS_RUNNING, api.RUNTIME_STATUS_COMPLETED, api.RUNTIME_STATUS_PENDING},
}

// Run the orchestration
Expand Down Expand Up @@ -225,11 +225,11 @@ func TestSingleActivityWorkflow_ReuseInstanceIDTerminate(t *testing.T) {

for _, opt := range GetTestOptions() {
suffix := opt(engine)
t.Run(opt(engine), func(t *testing.T) {
t.Run(suffix, func(t *testing.T) {
instanceID := api.InstanceID("IGNORE_IF_RUNNING_OR_COMPLETED_" + suffix)
reuseIDPolicy := &api.OrchestrationIdReusePolicy{
Action: api.TERMINATE,
OperationStatus: []api.OrchestrationStatus{api.RUNNING, api.COMPLETED, api.PENDING},
Action: api.REUSE_ID_ACTION_TERMINATE,
OperationStatus: []api.OrchestrationStatus{api.RUNTIME_STATUS_RUNNING, api.RUNTIME_STATUS_COMPLETED, api.RUNTIME_STATUS_PENDING},
}

// Run the orchestration
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestSingleActivityWorkflow_ReuseInstanceIDError(t *testing.T) {

for _, opt := range GetTestOptions() {
suffix := opt(engine)
t.Run(opt(engine), func(t *testing.T) {
t.Run(suffix, func(t *testing.T) {
instanceID := api.InstanceID("IGNORE_IF_RUNNING_OR_COMPLETED_" + suffix)
id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID))
require.NoError(t, err)
Expand Down
52 changes: 26 additions & 26 deletions pkg/runtime/wfengine/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ type recoverableError struct {
cause error
}

type PolicyAndEventData struct {
Policy *api.OrchestrationIdReusePolicy `json:"policy"`
EventData []byte `json:"eventData"`
type CreateWorkflowInstanceRequest struct {
Policy *api.OrchestrationIdReusePolicy `json:"policy"`
StartEventBytes []byte `json:"startEventBytes"`
}

// workflowScheduler is a func interface for pushing workflow (orchestration) work items into the backend
Expand Down Expand Up @@ -174,12 +174,12 @@ func (wf *workflowActor) createWorkflowInstance(ctx context.Context, actorID str
created = true
}

var policyAndEventData PolicyAndEventData
if err = json.Unmarshal(request, &policyAndEventData); err != nil {
return fmt.Errorf("failed to unmarshal policyAndEventData: %w", err)
var createWorkflowInstanceRequest CreateWorkflowInstanceRequest
if err = json.Unmarshal(request, &createWorkflowInstanceRequest); err != nil {
return fmt.Errorf("failed to unmarshal createWorkflowInstanceRequest: %w", err)

Check warning on line 179 in pkg/runtime/wfengine/workflow.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/workflow.go#L179

Added line #L179 was not covered by tests
}
reuseIDPolicy := policyAndEventData.Policy
startEventBytes := policyAndEventData.EventData
reuseIDPolicy := createWorkflowInstanceRequest.Policy
startEventBytes := createWorkflowInstanceRequest.StartEventBytes

// Ensure that the start event payload is a valid durabletask execution-started event
startEvent, err := backend.UnmarshalHistoryEvent(startEventBytes)
Expand All @@ -198,43 +198,43 @@ func (wf *workflowActor) createWorkflowInstance(ctx context.Context, actorID str

// orchestration didn't exist and just create it.
if created {
return wf.createInstance(ctx, actorID, startEvent, state)
return wf.scheduleWorkflowStart(ctx, actorID, startEvent, state)
}

// orchestration already exists, apply reuse id policy
runtimeState := getRuntimeState(actorID, state)
runtimeStatus := runtimeState.RuntimeStatus()
targetStatusValues := buildStatusSet(reuseIDPolicy.GetOperationStatus())
// if target status doesn't match, fall back to original logic, create instance only if previous one is completed
if _, ok := targetStatusValues[runtimeStatus]; !ok {
if !isStatusMatch(reuseIDPolicy.GetOperationStatus(), runtimeStatus) {
return wf.createIfCompleted(ctx, runtimeState, actorID, state, startEvent)
}

switch reuseIDPolicy.GetAction() {
case api.IGNORE:
case api.REUSE_ID_ACTION_IGNORE:
// Log an warning message and ignore creating new instance
wfLogger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", actorID)
return nil
case api.TERMINATE:
case api.REUSE_ID_ACTION_TERMINATE:
// terminate existing instance
if err := wf.cleanupOrchestrationStateInternal(ctx, actorID, state, false); err != nil {
if err := wf.cleanupOrchestrationStateInternal(ctx, actorID, state, runtimeStatus, false); err != nil {
return fmt.Errorf("failed to terminate existing instance with ID '%s'", actorID)

Check warning on line 220 in pkg/runtime/wfengine/workflow.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/workflow.go#L220

Added line #L220 was not covered by tests
}

// created a new instance
state.Reset()
return wf.createInstance(ctx, actorID, startEvent, state)
return wf.scheduleWorkflowStart(ctx, actorID, startEvent, state)
}
// default Action ERROR, fall back to original logic
return wf.createIfCompleted(ctx, runtimeState, actorID, state, startEvent)

Check warning on line 228 in pkg/runtime/wfengine/workflow.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/wfengine/workflow.go#L228

Added line #L228 was not covered by tests
}

func buildStatusSet(statuses []api.OrchestrationStatus) map[api.OrchestrationStatus]struct{} {
statusSet := make(map[api.OrchestrationStatus]struct{}, len(statuses))
func isStatusMatch(statuses []api.OrchestrationStatus, runtimeStatus api.OrchestrationStatus) bool {
for _, status := range statuses {
statusSet[status] = struct{}{}
if status == runtimeStatus {
return true
}
}
return statusSet
return false
}

func (wf *workflowActor) createIfCompleted(ctx context.Context, runtimeState *backend.OrchestrationRuntimeState, actorID string, state *workflowState, startEvent *backend.HistoryEvent) error {
Expand All @@ -248,10 +248,10 @@ func (wf *workflowActor) createIfCompleted(ctx context.Context, runtimeState *ba
}
wfLogger.Infof("Workflow actor '%s': workflow was previously completed and is being recreated", actorID)
state.Reset()
return wf.createInstance(ctx, actorID, startEvent, state)
return wf.scheduleWorkflowStart(ctx, actorID, startEvent, state)
}

func (wf *workflowActor) createInstance(ctx context.Context, actorID string, startEvent *backend.HistoryEvent, state *workflowState) error {
func (wf *workflowActor) scheduleWorkflowStart(ctx context.Context, actorID string, startEvent *backend.HistoryEvent, state *workflowState) error {
// Schedule a reminder to execute immediately after this operation. The reminder will trigger the actual
// workflow execution. This is preferable to using the current thread so that we don't block the client
// while the workflow logic is running.
Expand All @@ -264,10 +264,8 @@ func (wf *workflowActor) createInstance(ctx context.Context, actorID string, sta
}

// This method cleans up a workflow associated with the given actorID
func (wf *workflowActor) cleanupOrchestrationStateInternal(ctx context.Context, actorID string, state *workflowState, requireCompleted bool) error {
runtimeState := getRuntimeState(actorID, state)
runtimeStatus := runtimeState.RuntimeStatus()
isCompleted := runtimeStatus == api.COMPLETED || runtimeStatus == api.TERMINATED || runtimeStatus == api.FAILED
func (wf *workflowActor) cleanupOrchestrationStateInternal(ctx context.Context, actorID string, state *workflowState, runtimeStatus api.OrchestrationStatus, requireCompleted bool) error {
isCompleted := runtimeStatus == api.RUNTIME_STATUS_COMPLETED || runtimeStatus == api.RUNTIME_STATUS_TERMINATED || runtimeStatus == api.RUNTIME_STATUS_FAILED

// purge orchestration in ['COMPLETED', 'FAILED', 'TERMINATED']
if requireCompleted && !isCompleted {
Expand Down Expand Up @@ -334,7 +332,9 @@ func (wf *workflowActor) purgeWorkflowState(ctx context.Context, actorID string)
if state == nil {
return api.ErrInstanceNotFound
}
return wf.cleanupOrchestrationStateInternal(ctx, actorID, state, true)
runtimeState := getRuntimeState(actorID, state)
runtimeStatus := runtimeState.RuntimeStatus()
return wf.cleanupOrchestrationStateInternal(ctx, actorID, state, runtimeStatus, true)
}

func (wf *workflowActor) addWorkflowEvent(ctx context.Context, actorID string, historyEventBytes []byte) error {
Expand Down

0 comments on commit 0cecc57

Please sign in to comment.