Skip to content

Commit

Permalink
Fixed precedence of LocalActivityOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
mfateev committed Apr 19, 2024
1 parent 53bf5b8 commit be32eeb
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 113 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.common;

import io.temporal.activity.LocalActivityOptions;
import java.util.HashMap;
import java.util.Map;

/**
* The chain of LocalActivityOptions and per type options maps. Used to merge options specified at
* the following layers:
*
* <pre>
* * WorkflowImplementationOptions
* * Workflow
* * ActivityStub
* </pre>
*
* Each next layer overrides specific options specified at the previous layer.
*/
public final class MergedLocalActivityOptions {

/** Common options across all activity types. */
private LocalActivityOptions defaultOptions;

/** Per activity type options. These override defaultOptions. */
private final Map<String, LocalActivityOptions> optionsMap = new HashMap<>();

/** The options specified at the previous layer. They are overriden by this object. */
private final MergedLocalActivityOptions overridden;

public MergedLocalActivityOptions(
MergedLocalActivityOptions overridden,
LocalActivityOptions defaultOptions,
Map<String, LocalActivityOptions> optionsMap) {
this.overridden = overridden;
this.defaultOptions = defaultOptions;
if (optionsMap != null) {
this.optionsMap.putAll(optionsMap);
}
}

public MergedLocalActivityOptions(MergedLocalActivityOptions overridden) {
this.overridden = overridden;
defaultOptions = null;
}

public void setDefaultOptions(LocalActivityOptions defaultOptions) {
this.defaultOptions = defaultOptions;
}

public void applyOptionsMap(Map<String, LocalActivityOptions> optionsMap) {
if (optionsMap != null) {
this.optionsMap.putAll(optionsMap);
}
}

/** Get merged options for the given activityType. */
public LocalActivityOptions getMergedOptions(String activityType) {
LocalActivityOptions overrideOptions = null;
if (overridden != null) {
overrideOptions = overridden.getMergedOptions(activityType);
}
return merge(overrideOptions, defaultOptions, optionsMap.get(activityType));
}

/** later options override the previous ones */
private static LocalActivityOptions merge(LocalActivityOptions... options) {
if (options == null || options.length == 0) {
return null;
}
LocalActivityOptions result = options[0];
for (int i = 1; i < options.length; i++) {
if (result == null) {
result = options[i];
} else {
result = result.toBuilder().mergeActivityOptions(options[i]).build();
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,41 +24,36 @@
import io.temporal.activity.LocalActivityOptions;
import io.temporal.common.MethodRetry;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.internal.common.MergedLocalActivityOptions;
import io.temporal.workflow.ActivityStub;
import io.temporal.workflow.Functions;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

@VisibleForTesting
public class LocalActivityInvocationHandler extends ActivityInvocationHandlerBase {
private final LocalActivityOptions options;
private final Map<String, LocalActivityOptions> activityMethodOptions;
private final MergedLocalActivityOptions options;
private final WorkflowOutboundCallsInterceptor activityExecutor;
private final Functions.Proc assertReadOnly;

@VisibleForTesting
public static InvocationHandler newInstance(
Class<?> activityInterface,
LocalActivityOptions options,
Map<String, LocalActivityOptions> methodOptions,
MergedLocalActivityOptions options,
WorkflowOutboundCallsInterceptor activityExecutor,
Functions.Proc assertReadOnly) {
return new LocalActivityInvocationHandler(
activityInterface, activityExecutor, options, methodOptions, assertReadOnly);
activityInterface, activityExecutor, options, assertReadOnly);
}

private LocalActivityInvocationHandler(
Class<?> activityInterface,
WorkflowOutboundCallsInterceptor activityExecutor,
LocalActivityOptions options,
Map<String, LocalActivityOptions> methodOptions,
MergedLocalActivityOptions options,
Functions.Proc assertReadOnly) {
super(activityInterface);
this.options = options;
this.activityMethodOptions = (methodOptions == null) ? new HashMap<>() : methodOptions;
this.activityExecutor = activityExecutor;
this.assertReadOnly = assertReadOnly;
}
Expand All @@ -68,11 +63,7 @@ private LocalActivityInvocationHandler(
public Function<Object[], Object> getActivityFunc(
Method method, MethodRetry methodRetry, String activityName) {
Function<Object[], Object> function;
LocalActivityOptions mergedOptions =
LocalActivityOptions.newBuilder(options)
.mergeActivityOptions(activityMethodOptions.get(activityName))
.setMethodRetry(methodRetry)
.build();
LocalActivityOptions mergedOptions = options.getMergedOptions(activityName);
ActivityStub stub =
LocalActivityStubImpl.newInstance(mergedOptions, activityExecutor, assertReadOnly);
function =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.failure.*;
import io.temporal.internal.common.ActivityOptionUtils;
import io.temporal.internal.common.HeaderUtils;
import io.temporal.internal.common.MergedActivityOptions;
import io.temporal.internal.common.MergedLocalActivityOptions;
import io.temporal.internal.common.OptionsUtils;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.common.SdkFlag;
Expand Down Expand Up @@ -121,9 +121,8 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall
private WorkflowOutboundCallsInterceptor headOutboundInterceptor;

private final MergedActivityOptions activityOptions;
private final MergedLocalActivityOptions localActivityOptions;

private LocalActivityOptions defaultLocalActivityOptions = null;
private Map<String, LocalActivityOptions> localActivityOptionsMap;
private boolean readOnly = false;

public SyncWorkflowContext(
Expand All @@ -145,20 +144,22 @@ public SyncWorkflowContext(
this.signalDispatcher = signalDispatcher;
this.queryDispatcher = queryDispatcher;
this.updateDispatcher = updateDispatcher;
MergedActivityOptions activityOptionsFromWorkflowImplementationOptions = null;
MergedActivityOptions activityOptions = null;
MergedLocalActivityOptions localActivityOptions = null;
if (workflowImplementationOptions != null) {
activityOptionsFromWorkflowImplementationOptions =
activityOptions =
new MergedActivityOptions(
null,
workflowImplementationOptions.getDefaultActivityOptions(),
workflowImplementationOptions.getActivityOptions());
this.defaultLocalActivityOptions =
workflowImplementationOptions.getDefaultLocalActivityOptions();
this.localActivityOptionsMap =
new HashMap<>(workflowImplementationOptions.getLocalActivityOptions());
localActivityOptions =
new MergedLocalActivityOptions(
null,
workflowImplementationOptions.getDefaultLocalActivityOptions(),
workflowImplementationOptions.getLocalActivityOptions());
}
this.activityOptions =
new MergedActivityOptions(activityOptionsFromWorkflowImplementationOptions);
this.activityOptions = new MergedActivityOptions(activityOptions);
this.localActivityOptions = new MergedLocalActivityOptions(localActivityOptions);
this.workflowImplementationOptions =
workflowImplementationOptions == null
? WorkflowImplementationOptions.getDefaultInstance()
Expand Down Expand Up @@ -211,14 +212,8 @@ public MergedActivityOptions getActivityOptions() {
return activityOptions;
}

public LocalActivityOptions getDefaultLocalActivityOptions() {
return defaultLocalActivityOptions;
}

public @Nonnull Map<String, LocalActivityOptions> getLocalActivityOptions() {
return localActivityOptionsMap != null
? Collections.unmodifiableMap(localActivityOptionsMap)
: Collections.emptyMap();
public MergedLocalActivityOptions getLocalActivityOptions() {
return localActivityOptions;
}

public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) {
Expand All @@ -230,22 +225,11 @@ public void applyActivityOptions(Map<String, ActivityOptions> activityTypeToOpti
}

public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) {
this.defaultLocalActivityOptions =
(this.defaultLocalActivityOptions == null)
? defaultLocalActivityOptions
: this.defaultLocalActivityOptions.toBuilder()
.mergeActivityOptions(defaultLocalActivityOptions)
.build();
localActivityOptions.setDefaultOptions(defaultLocalActivityOptions);
}

public void applyLocalActivityOptions(Map<String, LocalActivityOptions> activityTypeToOption) {
Objects.requireNonNull(activityTypeToOption);
if (this.localActivityOptionsMap == null) {
this.localActivityOptionsMap = new HashMap<>(activityTypeToOption);
return;
}
ActivityOptionUtils.mergePredefinedLocalActivityOptions(
localActivityOptionsMap, activityTypeToOption);
localActivityOptions.applyOptionsMap(activityTypeToOption);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal;

import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.uber.m3.tally.Scope;
import io.temporal.activity.ActivityOptions;
Expand All @@ -41,8 +40,8 @@
import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata;
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
import io.temporal.internal.WorkflowThreadMarker;
import io.temporal.internal.common.ActivityOptionUtils;
import io.temporal.internal.common.MergedActivityOptions;
import io.temporal.internal.common.MergedLocalActivityOptions;
import io.temporal.internal.common.NonIdempotentHandle;
import io.temporal.internal.common.SearchAttributesUtil;
import io.temporal.internal.logging.ReplayAwareLogger;
Expand Down Expand Up @@ -328,31 +327,14 @@ public static <T> T newLocalActivityStub(
// Merge the activity options we may have received from the workflow with the options we may
// have received in WorkflowImplementationOptions.
SyncWorkflowContext context = getRootWorkflowContext();
options = (options == null) ? context.getDefaultLocalActivityOptions() : options;

Map<String, LocalActivityOptions> mergedLocalActivityOptionsMap;
@Nonnull
Map<String, LocalActivityOptions> predefinedLocalActivityOptions =
context.getLocalActivityOptions();
if (activityMethodOptions != null
&& !activityMethodOptions.isEmpty()
&& predefinedLocalActivityOptions.isEmpty()) {
// we need to merge only in this case
mergedLocalActivityOptionsMap = new HashMap<>(predefinedLocalActivityOptions);
ActivityOptionUtils.mergePredefinedLocalActivityOptions(
mergedLocalActivityOptionsMap, activityMethodOptions);
} else {
mergedLocalActivityOptionsMap =
MoreObjects.firstNonNull(
activityMethodOptions,
MoreObjects.firstNonNull(predefinedLocalActivityOptions, Collections.emptyMap()));
}
MergedLocalActivityOptions activityOptions =
new MergedLocalActivityOptions(
context.getLocalActivityOptions(), options, activityMethodOptions);

InvocationHandler invocationHandler =
LocalActivityInvocationHandler.newInstance(
activityInterface,
options,
mergedLocalActivityOptionsMap,
activityOptions,
WorkflowInternal.getWorkflowOutboundInterceptor(),
() -> assertNotReadOnly("schedule local activity"));
return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
Expand Down

0 comments on commit be32eeb

Please sign in to comment.