Skip to content

Commit

Permalink
Renamed ActivityOptionsWithDefault to MergedActivityOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
mfateev committed Apr 18, 2024
1 parent 51249bd commit 45f1f41
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,39 @@
import java.util.HashMap;
import java.util.Map;

public final class ActivityOptionsWithDefault {
/**
* The chain of ActivityOptions and per type options maps. Used to merge options specified at the
* following layers:
*
* <pre>
* * WorkflowImplementationOptions
* * Workflow
* * ActivityStub
* </pre>
*
* Each next layer overrides specific options specified at the previous layer.
*/
public final class MergedActivityOptions {

/** Common options across all activity types. */
private ActivityOptions defaultOptions;

/** Per activity type options. These override defaultOptions. */
private Map<String, ActivityOptions> optionsMap;

private final ActivityOptionsWithDefault overridden;
/** The options specified at the previous layer. They are overriden by this object. */
private final MergedActivityOptions overridden;

public ActivityOptionsWithDefault(
ActivityOptionsWithDefault overridden,
public MergedActivityOptions(
MergedActivityOptions overridden,
ActivityOptions defaultOptions,
Map<String, ActivityOptions> optionsMap) {
this.overridden = overridden;
this.defaultOptions = defaultOptions;
this.optionsMap = new HashMap<>(optionsMap);
}

public ActivityOptionsWithDefault(ActivityOptionsWithDefault overridden) {
public MergedActivityOptions(MergedActivityOptions overridden) {
this.overridden = overridden;
defaultOptions = null;
optionsMap = null;
Expand All @@ -34,6 +50,7 @@ public void setOptionsMap(Map<String, ActivityOptions> optionsMap) {
this.optionsMap = optionsMap;
}

/** Get merged options for the given activityType. */
public ActivityOptions getMergedOptions(String activityType) {
ActivityOptions overrideOptions = overridden.getMergedOptions(activityType);
return merge(overrideOptions, defaultOptions, optionsMap.get(activityType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.temporal.activity.ActivityOptions;
import io.temporal.common.MethodRetry;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.internal.common.ActivityOptionsWithDefault;
import io.temporal.internal.common.MergedActivityOptions;
import io.temporal.workflow.ActivityStub;
import io.temporal.workflow.Functions;
import java.lang.reflect.InvocationHandler;
Expand All @@ -33,14 +33,14 @@

@VisibleForTesting
public class ActivityInvocationHandler extends ActivityInvocationHandlerBase {
private final ActivityOptionsWithDefault options;
private final MergedActivityOptions options;
private final WorkflowOutboundCallsInterceptor activityExecutor;
private final Functions.Proc assertReadOnly;

@VisibleForTesting
public static InvocationHandler newInstance(
Class<?> activityInterface,
ActivityOptionsWithDefault options,
MergedActivityOptions options,
WorkflowOutboundCallsInterceptor activityExecutor,
Functions.Proc assertReadOnly) {
return new ActivityInvocationHandler(
Expand All @@ -50,7 +50,7 @@ public static InvocationHandler newInstance(
private ActivityInvocationHandler(
Class<?> activityInterface,
WorkflowOutboundCallsInterceptor activityExecutor,
ActivityOptionsWithDefault options,
MergedActivityOptions options,
Functions.Proc assertReadOnly) {
super(activityInterface);
this.options = options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.failure.*;
import io.temporal.internal.common.ActivityOptionUtils;
import io.temporal.internal.common.ActivityOptionsWithDefault;
import io.temporal.internal.common.HeaderUtils;
import io.temporal.internal.common.MergedActivityOptions;
import io.temporal.internal.common.OptionsUtils;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.common.SdkFlag;
Expand Down Expand Up @@ -120,14 +120,7 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall
private WorkflowInboundCallsInterceptor headInboundInterceptor;
private WorkflowOutboundCallsInterceptor headOutboundInterceptor;

/** Activity options specified through {@link WorkflowImplementationOptions}. */
private ActivityOptionsWithDefault workflowImplementationOptionsActivityOptions;

/**
* Activity options specified through {@link Workflow#setDefaultActivityOptions(ActivityOptions)}
* and {@link Workflow#applyActivityOptions(Map)}
*/
private ActivityOptionsWithDefault workflowActivityOptions;
private final MergedActivityOptions activityOptions;

private LocalActivityOptions defaultLocalActivityOptions = null;
private Map<String, LocalActivityOptions> localActivityOptionsMap;
Expand All @@ -152,9 +145,10 @@ public SyncWorkflowContext(
this.signalDispatcher = signalDispatcher;
this.queryDispatcher = queryDispatcher;
this.updateDispatcher = updateDispatcher;
MergedActivityOptions activityOptionsFromWorkflowImplementationOptions = null;
if (workflowImplementationOptions != null) {
this.workflowImplementationOptionsActivityOptions =
new ActivityOptionsWithDefault(
activityOptionsFromWorkflowImplementationOptions =
new MergedActivityOptions(
null,
workflowImplementationOptions.getDefaultActivityOptions(),
workflowImplementationOptions.getActivityOptions());
Expand All @@ -163,6 +157,8 @@ public SyncWorkflowContext(
this.localActivityOptionsMap =
new HashMap<>(workflowImplementationOptions.getLocalActivityOptions());
}
this.activityOptions =
new MergedActivityOptions(activityOptionsFromWorkflowImplementationOptions);
this.workflowImplementationOptions =
workflowImplementationOptions == null
? WorkflowImplementationOptions.getDefaultInstance()
Expand Down Expand Up @@ -211,8 +207,8 @@ public void initHeadInboundCallsInterceptor(WorkflowInboundCallsInterceptor head
updateDispatcher.setInboundCallsInterceptor(head);
}

public ActivityOptionsWithDefault getActivityOptions() {
return workflowImplementationOptionsActivityOptions;
public MergedActivityOptions getActivityOptions() {
return activityOptions;
}

public LocalActivityOptions getDefaultLocalActivityOptions() {
Expand All @@ -226,11 +222,11 @@ public LocalActivityOptions getDefaultLocalActivityOptions() {
}

public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) {
workflowActivityOptions.setDefaultOptions(defaultActivityOptions);
activityOptions.setDefaultOptions(defaultActivityOptions);
}

public void applyActivityOptions(Map<String, ActivityOptions> activityTypeToOption) {
workflowActivityOptions.setOptionsMap(activityTypeToOption);
activityOptions.setOptionsMap(activityTypeToOption);
}

public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
import io.temporal.internal.WorkflowThreadMarker;
import io.temporal.internal.common.ActivityOptionUtils;
import io.temporal.internal.common.ActivityOptionsWithDefault;
import io.temporal.internal.common.MergedActivityOptions;
import io.temporal.internal.common.NonIdempotentHandle;
import io.temporal.internal.common.SearchAttributesUtil;
import io.temporal.internal.logging.ReplayAwareLogger;
Expand Down Expand Up @@ -301,14 +301,13 @@ public static <T> T newActivityStub(
// Merge the activity options we may have received from the workflow with the options we may
// have received in WorkflowImplementationOptions.
SyncWorkflowContext context = getRootWorkflowContext();
ActivityOptionsWithDefault optionsWithDefault =
new ActivityOptionsWithDefault(
context.getActivityOptions(), options, activityMethodOptions);
MergedActivityOptions activityOptions =
new MergedActivityOptions(context.getActivityOptions(), options, activityMethodOptions);

InvocationHandler invocationHandler =
ActivityInvocationHandler.newInstance(
activityInterface,
optionsWithDefault,
activityOptions,
context.getWorkflowOutboundInterceptor(),
() -> assertNotReadOnly("schedule activity"));
return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import io.temporal.internal.activity.ActivityExecutionContextFactory;
import io.temporal.internal.activity.ActivityExecutionContextFactoryImpl;
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
import io.temporal.internal.common.ActivityOptionsWithDefault;
import io.temporal.internal.common.MergedActivityOptions;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.sync.*;
import io.temporal.internal.testservice.InProcessGRPCServer;
Expand Down Expand Up @@ -189,11 +189,10 @@ public <T> T newActivityStub(Class<T> activityInterface) {
.setScheduleToCloseTimeout(Duration.ofDays(1))
.setHeartbeatTimeout(Duration.ofSeconds(1))
.build();
ActivityOptionsWithDefault optionsWithDefault =
new ActivityOptionsWithDefault(null, options, null);
MergedActivityOptions activityOptions = new MergedActivityOptions(null, options, null);
InvocationHandler invocationHandler =
ActivityInvocationHandler.newInstance(
activityInterface, optionsWithDefault, new TestActivityExecutor(), () -> {});
activityInterface, activityOptions, new TestActivityExecutor(), () -> {});
invocationHandler =
new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
Expand All @@ -207,12 +206,11 @@ public <T> T newActivityStub(Class<T> activityInterface) {
*/
@Override
public <T> T newActivityStub(Class<T> activityInterface, ActivityOptions options) {
ActivityOptionsWithDefault optionsWithDefault =
new ActivityOptionsWithDefault(null, options, null);
MergedActivityOptions activityOptions = new MergedActivityOptions(null, options, null);

InvocationHandler invocationHandler =
ActivityInvocationHandler.newInstance(
activityInterface, optionsWithDefault, new TestActivityExecutor(), () -> {});
activityInterface, activityOptions, new TestActivityExecutor(), () -> {});
invocationHandler =
new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
Expand Down

0 comments on commit 45f1f41

Please sign in to comment.