Skip to content

Commit

Permalink
Refactored ActivityOptions merging
Browse files Browse the repository at this point in the history
  • Loading branch information
mfateev committed Apr 18, 2024
1 parent ed211fa commit 51249bd
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ public Builder setVersioningIntent(VersioningIntent versioningIntent) {
return this;
}

/** Merge options with override parameter having higher precedence. */
public Builder mergeActivityOptions(ActivityOptions override) {
if (override == null) {
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.temporal.internal.common;

import io.temporal.activity.ActivityOptions;
import java.util.HashMap;
import java.util.Map;

public final class ActivityOptionsWithDefault {
private ActivityOptions defaultOptions;

private Map<String, ActivityOptions> optionsMap;

private final ActivityOptionsWithDefault overridden;

public ActivityOptionsWithDefault(
ActivityOptionsWithDefault overridden,
ActivityOptions defaultOptions,
Map<String, ActivityOptions> optionsMap) {
this.overridden = overridden;
this.defaultOptions = defaultOptions;
this.optionsMap = new HashMap<>(optionsMap);
}

public ActivityOptionsWithDefault(ActivityOptionsWithDefault overridden) {
this.overridden = overridden;
defaultOptions = null;
optionsMap = null;
}

public void setDefaultOptions(ActivityOptions defaultOptions) {
this.defaultOptions = defaultOptions;
}

public void setOptionsMap(Map<String, ActivityOptions> optionsMap) {
this.optionsMap = optionsMap;
}

public ActivityOptions getMergedOptions(String activityType) {
ActivityOptions overrideOptions = overridden.getMergedOptions(activityType);
return merge(overrideOptions, defaultOptions, optionsMap.get(activityType));
}

/** later options override the previous ones */
private static ActivityOptions merge(ActivityOptions... options) {
if (options == null || options.length == 0) {
return null;
}
ActivityOptions result = options[0];
for (int i = 1; i < options.length; i++) {
result = result.toBuilder().mergeActivityOptions(options[i]).build();
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,41 +24,36 @@
import io.temporal.activity.ActivityOptions;
import io.temporal.common.MethodRetry;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.internal.common.ActivityOptionsWithDefault;
import io.temporal.workflow.ActivityStub;
import io.temporal.workflow.Functions;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

@VisibleForTesting
public class ActivityInvocationHandler extends ActivityInvocationHandlerBase {
private final ActivityOptions options;
private final Map<String, ActivityOptions> activityMethodOptions;
private final ActivityOptionsWithDefault options;
private final WorkflowOutboundCallsInterceptor activityExecutor;
private final Functions.Proc assertReadOnly;

@VisibleForTesting
public static InvocationHandler newInstance(
Class<?> activityInterface,
ActivityOptions options,
Map<String, ActivityOptions> methodOptions,
ActivityOptionsWithDefault options,
WorkflowOutboundCallsInterceptor activityExecutor,
Functions.Proc assertReadOnly) {
return new ActivityInvocationHandler(
activityInterface, activityExecutor, options, methodOptions, assertReadOnly);
activityInterface, activityExecutor, options, assertReadOnly);
}

private ActivityInvocationHandler(
Class<?> activityInterface,
WorkflowOutboundCallsInterceptor activityExecutor,
ActivityOptions options,
Map<String, ActivityOptions> methodOptions,
ActivityOptionsWithDefault options,
Functions.Proc assertReadOnly) {
super(activityInterface);
this.options = options;
this.activityMethodOptions = (methodOptions == null) ? new HashMap<>() : methodOptions;
this.activityExecutor = activityExecutor;
this.assertReadOnly = assertReadOnly;
}
Expand All @@ -67,11 +62,7 @@ private ActivityInvocationHandler(
protected Function<Object[], Object> getActivityFunc(
Method method, MethodRetry methodRetry, String activityName) {
Function<Object[], Object> function;
ActivityOptions merged =
ActivityOptions.newBuilder(options)
.mergeActivityOptions(this.activityMethodOptions.get(activityName))
.mergeMethodRetry(methodRetry)
.build();
ActivityOptions merged = options.getMergedOptions(activityName);
if (merged.getStartToCloseTimeout() == null && merged.getScheduleToCloseTimeout() == null) {
throw new IllegalArgumentException(
"Both StartToCloseTimeout and ScheduleToCloseTimeout aren't specified for "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.failure.*;
import io.temporal.internal.common.ActivityOptionUtils;
import io.temporal.internal.common.ActivityOptionsWithDefault;
import io.temporal.internal.common.HeaderUtils;
import io.temporal.internal.common.OptionsUtils;
import io.temporal.internal.common.ProtobufTimeUtils;
Expand Down Expand Up @@ -119,8 +120,15 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall
private WorkflowInboundCallsInterceptor headInboundInterceptor;
private WorkflowOutboundCallsInterceptor headOutboundInterceptor;

private ActivityOptions defaultActivityOptions = null;
private Map<String, ActivityOptions> activityOptionsMap;
/** Activity options specified through {@link WorkflowImplementationOptions}. */
private ActivityOptionsWithDefault workflowImplementationOptionsActivityOptions;

/**
* Activity options specified through {@link Workflow#setDefaultActivityOptions(ActivityOptions)}
* and {@link Workflow#applyActivityOptions(Map)}
*/
private ActivityOptionsWithDefault workflowActivityOptions;

private LocalActivityOptions defaultLocalActivityOptions = null;
private Map<String, LocalActivityOptions> localActivityOptionsMap;
private boolean readOnly = false;
Expand All @@ -145,8 +153,11 @@ public SyncWorkflowContext(
this.queryDispatcher = queryDispatcher;
this.updateDispatcher = updateDispatcher;
if (workflowImplementationOptions != null) {
this.defaultActivityOptions = workflowImplementationOptions.getDefaultActivityOptions();
this.activityOptionsMap = new HashMap<>(workflowImplementationOptions.getActivityOptions());
this.workflowImplementationOptionsActivityOptions =
new ActivityOptionsWithDefault(
null,
workflowImplementationOptions.getDefaultActivityOptions(),
workflowImplementationOptions.getActivityOptions());
this.defaultLocalActivityOptions =
workflowImplementationOptions.getDefaultLocalActivityOptions();
this.localActivityOptionsMap =
Expand Down Expand Up @@ -200,14 +211,8 @@ public void initHeadInboundCallsInterceptor(WorkflowInboundCallsInterceptor head
updateDispatcher.setInboundCallsInterceptor(head);
}

public ActivityOptions getDefaultActivityOptions() {
return defaultActivityOptions;
}

public @Nonnull Map<String, ActivityOptions> getActivityOptions() {
return activityOptionsMap != null
? Collections.unmodifiableMap(activityOptionsMap)
: Collections.emptyMap();
public ActivityOptionsWithDefault getActivityOptions() {
return workflowImplementationOptionsActivityOptions;
}

public LocalActivityOptions getDefaultLocalActivityOptions() {
Expand All @@ -221,21 +226,11 @@ public LocalActivityOptions getDefaultLocalActivityOptions() {
}

public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) {
this.defaultActivityOptions =
(this.defaultActivityOptions == null)
? defaultActivityOptions
: this.defaultActivityOptions.toBuilder()
.mergeActivityOptions(defaultActivityOptions)
.build();
workflowActivityOptions.setDefaultOptions(defaultActivityOptions);
}

public void applyActivityOptions(Map<String, ActivityOptions> activityTypeToOption) {
Objects.requireNonNull(activityTypeToOption);
if (this.activityOptionsMap == null) {
this.activityOptionsMap = new HashMap<>(activityTypeToOption);
return;
}
ActivityOptionUtils.mergePredefinedActivityOptions(activityOptionsMap, activityTypeToOption);
workflowActivityOptions.setOptionsMap(activityTypeToOption);
}

public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
import io.temporal.internal.WorkflowThreadMarker;
import io.temporal.internal.common.ActivityOptionUtils;
import io.temporal.internal.common.ActivityOptionsWithDefault;
import io.temporal.internal.common.NonIdempotentHandle;
import io.temporal.internal.common.SearchAttributesUtil;
import io.temporal.internal.logging.ReplayAwareLogger;
Expand Down Expand Up @@ -300,29 +301,14 @@ public static <T> T newActivityStub(
// Merge the activity options we may have received from the workflow with the options we may
// have received in WorkflowImplementationOptions.
SyncWorkflowContext context = getRootWorkflowContext();
options = (options == null) ? context.getDefaultActivityOptions() : options;

Map<String, ActivityOptions> mergedActivityOptionsMap;
@Nonnull Map<String, ActivityOptions> predefinedActivityOptions = context.getActivityOptions();
if (activityMethodOptions != null
&& !activityMethodOptions.isEmpty()
&& predefinedActivityOptions.isEmpty()) {
// we need to merge only in this case
mergedActivityOptionsMap = new HashMap<>(predefinedActivityOptions);
ActivityOptionUtils.mergePredefinedActivityOptions(
mergedActivityOptionsMap, activityMethodOptions);
} else {
mergedActivityOptionsMap =
MoreObjects.firstNonNull(
activityMethodOptions,
MoreObjects.firstNonNull(predefinedActivityOptions, Collections.emptyMap()));
}
ActivityOptionsWithDefault optionsWithDefault =
new ActivityOptionsWithDefault(
context.getActivityOptions(), options, activityMethodOptions);

InvocationHandler invocationHandler =
ActivityInvocationHandler.newInstance(
activityInterface,
options,
mergedActivityOptionsMap,
optionsWithDefault,
context.getWorkflowOutboundInterceptor(),
() -> assertNotReadOnly("schedule activity"));
return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.temporal.internal.activity.ActivityExecutionContextFactory;
import io.temporal.internal.activity.ActivityExecutionContextFactoryImpl;
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
import io.temporal.internal.common.ActivityOptionsWithDefault;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.sync.*;
import io.temporal.internal.testservice.InProcessGRPCServer;
Expand Down Expand Up @@ -188,9 +189,11 @@ public <T> T newActivityStub(Class<T> activityInterface) {
.setScheduleToCloseTimeout(Duration.ofDays(1))
.setHeartbeatTimeout(Duration.ofSeconds(1))
.build();
ActivityOptionsWithDefault optionsWithDefault =
new ActivityOptionsWithDefault(null, options, null);
InvocationHandler invocationHandler =
ActivityInvocationHandler.newInstance(
activityInterface, options, null, new TestActivityExecutor(), () -> {});
activityInterface, optionsWithDefault, new TestActivityExecutor(), () -> {});
invocationHandler =
new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
Expand All @@ -204,9 +207,12 @@ public <T> T newActivityStub(Class<T> activityInterface) {
*/
@Override
public <T> T newActivityStub(Class<T> activityInterface, ActivityOptions options) {
ActivityOptionsWithDefault optionsWithDefault =
new ActivityOptionsWithDefault(null, options, null);

InvocationHandler invocationHandler =
ActivityInvocationHandler.newInstance(
activityInterface, options, null, new TestActivityExecutor(), () -> {});
activityInterface, optionsWithDefault, new TestActivityExecutor(), () -> {});
invocationHandler =
new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
Expand Down

0 comments on commit 51249bd

Please sign in to comment.