Skip to content

Commit

Permalink
[PLAT-6306] Add back tasktypes module
Browse files Browse the repository at this point in the history
Summary:
This change reverts commit d7afbc6
This also fixes the OOM caused by mockito memory leak:
See: mockito/mockito#1614
The mockito internal map was using 95% of the memory at the time of the OOM.

As suggested we are now calling
```
    Mockito.framework().clearInlineMocks()
```

However we are calling only once per class instead of after each test method.
This is because it interfears with JunitMockito rule.
Alternative is to add rull ordering as suggested in:
mockito/mockito#1902 (comment)

Test Plan: This fixes the tests

Reviewers: yshchetinin, hzare

Reviewed By: yshchetinin, hzare

Subscribers: yshchetinin, jenkins-bot, yugaware

Differential Revision: https://phabricator.dev.yugabyte.com/D21333
  • Loading branch information
sb-yb authored and jayant07-yb committed Dec 7, 2022
1 parent 9569fdd commit 42aa861
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 258 deletions.
4 changes: 3 additions & 1 deletion managed/src/main/java/Module.java
Expand Up @@ -11,8 +11,8 @@
import com.yugabyte.yw.commissioner.DefaultExecutorServiceProvider;
import com.yugabyte.yw.commissioner.ExecutorServiceProvider;
import com.yugabyte.yw.commissioner.HealthChecker;
import com.yugabyte.yw.commissioner.SetUniverseKey;
import com.yugabyte.yw.commissioner.PitrConfigPoller;
import com.yugabyte.yw.commissioner.SetUniverseKey;
import com.yugabyte.yw.commissioner.SupportBundleCleanup;
import com.yugabyte.yw.commissioner.TaskExecutor;
import com.yugabyte.yw.commissioner.TaskGarbageCollector;
Expand Down Expand Up @@ -58,6 +58,7 @@
import com.yugabyte.yw.controllers.MetricGrafanaController;
import com.yugabyte.yw.controllers.PlatformHttpActionAdapter;
import com.yugabyte.yw.metrics.MetricQueryHelper;
import com.yugabyte.yw.models.helpers.TaskTypesModule;
import com.yugabyte.yw.queries.QueryHelper;
import com.yugabyte.yw.scheduler.Scheduler;
import de.dentrassi.crypto.pem.PemKeyStoreProvider;
Expand Down Expand Up @@ -99,6 +100,7 @@ public void configure() {
} else {
log.info("Using Evolutions. Not using flyway migrations.");
}
install(new TaskTypesModule());

Security.addProvider(new PemKeyStoreProvider());
Security.addProvider(new BouncyCastleProvider());
Expand Down
Expand Up @@ -85,7 +85,7 @@ public Commissioner(
* @return true if abortable.
*/
public boolean isTaskAbortable(TaskType taskType) {
return TaskExecutor.isTaskAbortable(taskExecutor.getTaskClass(taskType));
return TaskExecutor.isTaskAbortable(taskType.getTaskClass());
}

/**
Expand All @@ -95,7 +95,7 @@ public boolean isTaskAbortable(TaskType taskType) {
* @return true if retryable.
*/
public boolean isTaskRetryable(TaskType taskType) {
return TaskExecutor.isTaskRetryable(taskExecutor.getTaskClass(taskType));
return TaskExecutor.isTaskRetryable(taskType.getTaskClass());
}

/**
Expand Down
Expand Up @@ -9,11 +9,9 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.api.client.util.Throwables;
import com.google.inject.Provider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.Sets;
import com.google.inject.Provider;
import com.yugabyte.yw.commissioner.ITask.Abortable;
import com.yugabyte.yw.commissioner.ITask.Retryable;
import com.yugabyte.yw.commissioner.UserTaskDetails.SubTaskGroupType;
Expand All @@ -35,7 +33,6 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -61,8 +58,6 @@
import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import play.Application;
import play.api.Play;

/**
* TaskExecutor is the executor service for tasks and their subtasks. It is very similar to the
Expand Down Expand Up @@ -122,7 +117,7 @@
public class TaskExecutor {

// This is a map from the task types to the classes.
private final BiMap<TaskType, Class<? extends ITask>> taskTypeClassBiMap;
private final Map<TaskType, Provider<ITask>> taskTypeMap;

// Task futures are waited for this long before checking abort status.
private static final long TASK_SPIN_WAIT_INTERVAL_MS = 2000;
Expand All @@ -142,8 +137,7 @@ public class TaskExecutor {

// A utility for Platform HA.
private final PlatformReplicationManager replicationManager;

private final Provider<Application> application;
private final Map<Class<? extends ITask>, TaskType> inverseTaskTypeMap;

private final AtomicBoolean isShutdown = new AtomicBoolean();

Expand Down Expand Up @@ -171,27 +165,6 @@ public class TaskExecutor {
KnownAlertLabels.TASK_TYPE.labelName(),
KnownAlertLabels.RESULT.labelName());

private Map<TaskType, Class<? extends ITask>> buildTaskTypesMap() {
// Initialize the map which holds the task types to their task class.
Map<TaskType, Class<? extends ITask>> typeMap = new HashMap<>();

for (TaskType taskType : TaskType.filteredValues()) {
// TODO: switch to guice map binder instead of this reflection usage.
String className = "com.yugabyte.yw.commissioner.tasks." + taskType.toString();
Class<? extends ITask> taskClass;
try {
taskClass = Class.forName(className).asSubclass(ITask.class);
typeMap.put(taskType, taskClass);
log.debug("Found task: {}", className);
} catch (ClassNotFoundException e) {
log.error("Could not find task for task type " + taskType, e);
throw new RuntimeException(e);
}
}
log.debug("Done loading tasks.");
return typeMap;
}

private static Summary buildSummary(String name, String description, String... labelNames) {
return Summary.build(name, description)
.quantile(0.5, 0.05)
Expand All @@ -217,11 +190,6 @@ private static void writeTaskStateMetric(
.observe(getDurationSeconds(startTime, endTime));
}

Class<? extends ITask> getTaskClass(TaskType taskType) {
checkNotNull(taskType, "Task type must be non-null");
return taskTypeClassBiMap.get(taskType);
}

// It looks for the annotation starting from the current class to its super classes until it
// finds. If it is not found, it returns false, else the value of enabled is returned. It is
// possible to override an annotation already defined in the superclass.
Expand All @@ -248,27 +216,26 @@ static boolean isTaskRetryable(Class<? extends ITask> taskClass) {
*/
public TaskType getTaskType(Class<? extends ITask> taskClass) {
checkNotNull(taskClass, "Task class must be non-null");
return taskTypeClassBiMap.inverse().get(taskClass);
return inverseTaskTypeMap.get(taskClass);
}

@Inject
public TaskExecutor(
Provider<Application> application,
ShutdownHookHandler shutdownHookHandler,
ExecutorServiceProvider executorServiceProvider,
PlatformReplicationManager replicationManager) {
this.application = application;
PlatformReplicationManager replicationManager,
Map<TaskType, Provider<ITask>> taskTypeMap,
Map<Class<? extends ITask>, TaskType> inverseTaskTypeMap) {
this.executorServiceProvider = executorServiceProvider;
this.replicationManager = replicationManager;
this.taskOwner = Util.getHostname();
this.skipSubTaskAbortableCheck = true;
shutdownHookHandler.addShutdownHook(
TaskExecutor.this,
(taskExecutor) -> {
taskExecutor.shutdown(Duration.ofMinutes(5));
},
(taskExecutor) -> taskExecutor.shutdown(Duration.ofMinutes(5)),
100 /* weight */);
taskTypeClassBiMap = ImmutableBiMap.copyOf(buildTaskTypesMap());
this.taskTypeMap = taskTypeMap;
this.inverseTaskTypeMap = inverseTaskTypeMap;
}

// Shuts down the task executor.
Expand Down Expand Up @@ -314,7 +281,7 @@ private void checkTaskExecutorState() {
public RunnableTask createRunnableTask(TaskType taskType, ITaskParams taskParams) {
checkNotNull(taskType, "Task type must be set");
checkNotNull(taskParams, "Task params must be set");
ITask task = this.application.get().injector().instanceOf(taskTypeClassBiMap.get(taskType));
ITask task = taskTypeMap.get(taskType).get();
task.initialize(taskParams);
return createRunnableTask(task);
}
Expand Down Expand Up @@ -470,7 +437,7 @@ public SubTaskGroup createSubTaskGroup(

@VisibleForTesting
TaskInfo createTaskInfo(ITask task) {
TaskType taskType = TaskType.valueOf(task.getClass().getSimpleName());
TaskType taskType = getTaskType(task.getClass());
// Create a new task info object.
TaskInfo taskInfo = new TaskInfo(taskType);
// Set the task details.
Expand Down Expand Up @@ -970,7 +937,7 @@ public class RunnableTask extends AbstractRunnableTask {
private final TaskCache taskCache = new TaskCache();
// Current execution position of subtasks.
private int subTaskPosition = 0;
private AtomicReference<TaskExecutionListener> taskExecutionListenerRef =
private final AtomicReference<TaskExecutionListener> taskExecutionListenerRef =
new AtomicReference<>();
// Time when the abort is set.
private volatile Instant abortTime;
Expand Down

0 comments on commit 42aa861

Please sign in to comment.