Skip to content

Commit

Permalink
Fixed continue as new inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
mfateev committed Dec 29, 2023
1 parent 7ce3f8c commit f767030
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,7 @@ private void processFailWorkflowExecution(
if (startRequest.hasMemo()) {
continueAsNewAttr.setMemo(startRequest.getMemo());
}
// TODO
ContinueAsNewWorkflowExecutionCommandAttributes coninueAsNewCommand =
continueAsNewAttr.build();
workflow.action(Action.CONTINUE_AS_NEW, ctx, coninueAsNewCommand, workflowTaskCompletedId);
Expand All @@ -1344,7 +1345,7 @@ private void processFailWorkflowExecution(
service.continueAsNew(
startRequest,
coninueAsNewCommand,
continuedAsNewEventAttributes.getNewExecutionRunId(),
continuedAsNewEventAttributes,
continuedRetryState,
identity,
getExecutionId(),
Expand Down Expand Up @@ -1465,16 +1466,16 @@ private void startNewCronRun(
.setRetryPolicy(startRequest.getRetryPolicy())
.setLastCompletionResult(lastCompletionResult);
lastFailure.ifPresent(builder::setFailure);
ContinueAsNewWorkflowExecutionCommandAttributes continueAsNewAttr = builder.build();
workflow.action(Action.CONTINUE_AS_NEW, ctx, continueAsNewAttr, workflowTaskCompletedId);
ContinueAsNewWorkflowExecutionCommandAttributes continueAsNewCommandAttr = builder.build();
workflow.action(Action.CONTINUE_AS_NEW, ctx, continueAsNewCommandAttr, workflowTaskCompletedId);
workflowTaskStateMachine.getData().workflowCompleted = true;
HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1);
WorkflowExecutionContinuedAsNewEventAttributes continuedAsNewEventAttributes =
event.getWorkflowExecutionContinuedAsNewEventAttributes();
service.continueAsNew(
startRequest,
continueAsNewAttr,
continuedAsNewEventAttributes.getNewExecutionRunId(),
continueAsNewCommandAttr,
continuedAsNewEventAttributes,
Optional.empty(),
identity,
getExecutionId(),
Expand Down Expand Up @@ -1530,7 +1531,7 @@ private void processContinueAsNewWorkflowExecution(
service.continueAsNew(
startRequest,
d,
event.getWorkflowExecutionContinuedAsNewEventAttributes().getNewExecutionRunId(),
event.getWorkflowExecutionContinuedAsNewEventAttributes(),
workflow.getData().retryState,
identity,
getExecutionId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
import io.temporal.api.namespace.v1.NamespaceInfo;
import io.temporal.api.testservice.v1.LockTimeSkippingRequest;
import io.temporal.api.testservice.v1.SleepRequest;
Expand Down Expand Up @@ -940,8 +941,8 @@ public void signalExternalWorkflowExecution(
*/
public String continueAsNew(
StartWorkflowExecutionRequest previousRunStartRequest,
ContinueAsNewWorkflowExecutionCommandAttributes a,
String newExecutionRunId,
ContinueAsNewWorkflowExecutionCommandAttributes ca,
WorkflowExecutionContinuedAsNewEventAttributes ea,
Optional<TestServiceRetryState> retryState,
String identity,
ExecutionId continuedExecutionId,
Expand All @@ -951,11 +952,11 @@ public String continueAsNew(
StartWorkflowExecutionRequest.Builder startRequestBuilder =
StartWorkflowExecutionRequest.newBuilder()
.setRequestId(UUID.randomUUID().toString())
.setWorkflowType(a.getWorkflowType())
.setWorkflowRunTimeout(a.getWorkflowRunTimeout())
.setWorkflowTaskTimeout(a.getWorkflowTaskTimeout())
.setWorkflowType(ea.getWorkflowType())
.setWorkflowRunTimeout(ea.getWorkflowRunTimeout())
.setWorkflowTaskTimeout(ea.getWorkflowTaskTimeout())
.setNamespace(continuedExecutionId.getNamespace())
.setTaskQueue(a.getTaskQueue())
.setTaskQueue(ea.getTaskQueue())
.setWorkflowId(continuedExecutionId.getWorkflowId().getWorkflowId())
.setWorkflowIdReusePolicy(previousRunStartRequest.getWorkflowIdReusePolicy())
.setIdentity(identity)
Expand All @@ -965,31 +966,31 @@ public String continueAsNew(
// if (previousRunStartRequest.hasRetryPolicy()) {
// startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy());
// }
if (a.hasRetryPolicy()) {
startRequestBuilder.setRetryPolicy(a.getRetryPolicy());
if (ca.hasRetryPolicy()) {
startRequestBuilder.setRetryPolicy(ca.getRetryPolicy());
}
if (a.hasInput()) {
startRequestBuilder.setInput(a.getInput());
if (ea.hasInput()) {
startRequestBuilder.setInput(ea.getInput());
}
if (a.hasHeader()) {
startRequestBuilder.setHeader(a.getHeader());
if (ea.hasHeader()) {
startRequestBuilder.setHeader(ea.getHeader());
}
StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
lock.lock();
Optional<Failure> lastFail =
a.hasFailure()
? Optional.of(a.getFailure())
ea.hasFailure()
? Optional.of(ea.getFailure())
: retryState.flatMap(TestServiceRetryState::getPreviousRunFailure);
try {
StartWorkflowExecutionResponse response =
startWorkflowExecutionNoRunningCheckLocked(
startRequest,
newExecutionRunId,
ea.getNewExecutionRunId(),
firstExecutionRunId,
Optional.of(continuedExecutionId.getExecution().getRunId()),
retryState,
ProtobufTimeUtils.toJavaDuration(a.getBackoffStartInterval()),
a.getLastCompletionResult(),
ProtobufTimeUtils.toJavaDuration(ea.getBackoffStartInterval()),
ea.getLastCompletionResult(),
lastFail,
parent,
parentChildInitiatedEventId,
Expand Down

0 comments on commit f767030

Please sign in to comment.