Skip to content

Commit

Permalink
Fix compare and update race condition in Profiler.
Browse files Browse the repository at this point in the history
I added AtomicReferences to timeSeries to be sure that timeSereis wasn't nullified. To implement this change I needed to make TimeSeries thread-safe.

PiperOrigin-RevId: 633477989
Change-Id: I1a2e00a32b0d9c613d63445299b2abd422545bd7
  • Loading branch information
wilwell authored and Copybara-Service committed May 14, 2024
1 parent 905a4fe commit a448f1c
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 37 deletions.
51 changes: 28 additions & 23 deletions src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -325,15 +325,19 @@ ImmutableList<SlowTask> getSlowestTasks() {
/** Collects local cpu usage data (if enabled). */
private LocalResourceCollector localResourceCollector;

private TimeSeries actionCountTimeSeries;
private TimeSeries actionCacheCountTimeSeries;
private TimeSeries localActionCountTimeSeries;
private final AtomicReference<TimeSeries> actionCountTimeSeriesRef;
private final AtomicReference<TimeSeries> actionCacheCountTimeSeriesRef;
private final AtomicReference<TimeSeries> localActionCountTimeSeriesRef;

private Duration actionCountStartTime;
private boolean collectTaskHistograms;
private boolean includePrimaryOutput;
private boolean includeTargetLabel;

private Profiler() {
actionCountTimeSeriesRef = new AtomicReference<>();
actionCacheCountTimeSeriesRef = new AtomicReference<>();
localActionCountTimeSeriesRef = new AtomicReference<>();
initHistograms();
for (ProfilerTask task : ProfilerTask.values()) {
if (task.collectsSlowestInstances) {
Expand Down Expand Up @@ -449,11 +453,12 @@ public synchronized void start(
this.profiledTasks = profiledTasks.isEmpty() ? profiledTasks : EnumSet.copyOf(profiledTasks);
this.clock = clock;
this.actionCountStartTime = Duration.ofNanos(clock.nanoTime());
this.actionCountTimeSeries = new TimeSeries(actionCountStartTime, ACTION_COUNT_BUCKET_DURATION);
this.actionCacheCountTimeSeries =
new TimeSeries(actionCountStartTime, ACTION_COUNT_BUCKET_DURATION);
this.localActionCountTimeSeries =
new TimeSeries(actionCountStartTime, ACTION_COUNT_BUCKET_DURATION);
this.actionCountTimeSeriesRef.set(
new TimeSeries(actionCountStartTime, ACTION_COUNT_BUCKET_DURATION));
this.actionCacheCountTimeSeriesRef.set(
new TimeSeries(actionCountStartTime, ACTION_COUNT_BUCKET_DURATION));
this.localActionCountTimeSeriesRef.set(
new TimeSeries(actionCountStartTime, ACTION_COUNT_BUCKET_DURATION));
this.collectTaskHistograms = collectTaskHistograms;
this.includePrimaryOutput = includePrimaryOutput;
this.includeTargetLabel = includeTargetLabel;
Expand Down Expand Up @@ -509,24 +514,27 @@ private void collectActionCounts() {
Duration endTime = Duration.ofNanos(clock.nanoTime());
int len = (int) endTime.minus(actionCountStartTime).dividedBy(ACTION_COUNT_BUCKET_DURATION) + 1;
Map<ProfilerTask, double[]> counterSeriesMap = new LinkedHashMap<>();
TimeSeries actionCountTimeSeries = actionCountTimeSeriesRef.get();
if (actionCountTimeSeries != null) {
double[] actionCountValues = actionCountTimeSeries.toDoubleArray(len);
actionCountTimeSeries = null;
actionCountTimeSeriesRef.set(null);
counterSeriesMap.put(ProfilerTask.ACTION_COUNTS, actionCountValues);
}
TimeSeries actionCacheCountTimeSeries = actionCacheCountTimeSeriesRef.get();
if (actionCacheCountTimeSeries != null) {
double[] actionCacheCountValues = actionCacheCountTimeSeries.toDoubleArray(len);
actionCacheCountTimeSeries = null;
actionCacheCountTimeSeriesRef.set(null);
counterSeriesMap.put(ProfilerTask.ACTION_CACHE_COUNTS, actionCacheCountValues);
}
if (!counterSeriesMap.isEmpty()) {
instance.logCounters(counterSeriesMap, actionCountStartTime, ACTION_COUNT_BUCKET_DURATION);
}

Map<ProfilerTask, double[]> localCounterSeriesMap = new LinkedHashMap<>();
TimeSeries localActionCountTimeSeries = localActionCountTimeSeriesRef.get();
if (localActionCountTimeSeries != null) {
double[] localActionCountValues = localActionCountTimeSeries.toDoubleArray(len);
localActionCountTimeSeries = null;
localActionCountTimeSeriesRef.set(null);
localCounterSeriesMap.put(ProfilerTask.LOCAL_ACTION_COUNTS, localActionCountValues);
}
if (hasNonZeroValues(localCounterSeriesMap)) {
Expand Down Expand Up @@ -874,24 +882,21 @@ private void recordTask(TaskData data) {
writer.enqueue(data);
}
long endTimeNanos = data.startTimeNanos + data.durationNanos;
TimeSeries actionCountTimeSeries = actionCountTimeSeriesRef.get();
TimeSeries actionCacheCountTimeSeries = actionCacheCountTimeSeriesRef.get();
TimeSeries localActionCountTimeSeries = localActionCountTimeSeriesRef.get();
if (actionCountTimeSeries != null && countAction(data.type)) {
synchronized (this) {
actionCountTimeSeries.addRange(
Duration.ofNanos(data.startTimeNanos), Duration.ofNanos(endTimeNanos));
}
actionCountTimeSeries.addRange(
Duration.ofNanos(data.startTimeNanos), Duration.ofNanos(endTimeNanos));
}
if (actionCacheCountTimeSeries != null && data.type == ProfilerTask.ACTION_CHECK) {
synchronized (this) {
actionCacheCountTimeSeries.addRange(
Duration.ofNanos(data.startTimeNanos), Duration.ofNanos(endTimeNanos));
}
actionCacheCountTimeSeries.addRange(
Duration.ofNanos(data.startTimeNanos), Duration.ofNanos(endTimeNanos));
}

if (localActionCountTimeSeries != null && data.type == ProfilerTask.LOCAL_ACTION_COUNTS) {
synchronized (this) {
localActionCountTimeSeries.addRange(
Duration.ofNanos(data.startTimeNanos), Duration.ofNanos(endTimeNanos));
}
localActionCountTimeSeries.addRange(
Duration.ofNanos(data.startTimeNanos), Duration.ofNanos(endTimeNanos));
}
SlowestTaskAggregator aggregator = slowestTasks[data.type.ordinal()];
if (aggregator != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.profiler;

import static java.lang.Math.max;

import java.time.Duration;
import java.util.Arrays;
import javax.annotation.concurrent.GuardedBy;

/**
* Converts a set of ranges into a graph by counting the number of ranges that are active at any
Expand All @@ -25,6 +28,8 @@ public class TimeSeries {
private final Duration startTime;
private final long bucketSizeMillis;
private static final int INITIAL_SIZE = 100;

@GuardedBy("this")
private double[] data = new double[INITIAL_SIZE];

public TimeSeries(Duration startTime, Duration bucketDuration) {
Expand Down Expand Up @@ -65,25 +70,27 @@ public void addRange(Duration rangeStart, Duration rangeEnd, double value) {
missingEndFraction = 0;
}

// Resize data array if necessary so it can at least fit endPosition.
if (endPosition >= data.length) {
data = Arrays.copyOf(data, Math.max(endPosition + 1, 2 * data.length));
}

// Do the actual update.
for (int i = startPosition; i <= endPosition; i++) {
double fraction = 1;
if (i == startPosition) {
fraction -= missingStartFraction;
synchronized (this) {
// Resize data array if necessary so it can at least fit endPosition.
if (endPosition >= data.length) {
data = Arrays.copyOf(data, max(endPosition + 1, 2 * data.length));
}
if (i == endPosition) {
fraction -= missingEndFraction;

// Do the actual update.
for (int i = startPosition; i <= endPosition; i++) {
double fraction = 1;
if (i == startPosition) {
fraction -= missingStartFraction;
}
if (i == endPosition) {
fraction -= missingEndFraction;
}
data[i] += fraction * value;
}
data[i] += fraction * value;
}
}

public double[] toDoubleArray(int len) {
public synchronized double[] toDoubleArray(int len) {
return Arrays.copyOf(data, len);
}
}
1 change: 1 addition & 0 deletions src/test/java/com/google/devtools/build/lib/profiler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ java_test(
"//src/main/java/com/google/devtools/build/lib/worker:worker_process_status",
"//src/main/java/com/google/devtools/common/options",
"//src/test/java/com/google/devtools/build/lib/testutil",
"//src/test/java/com/google/devtools/build/lib/testutil:TestThread",
"//src/test/java/com/google/devtools/build/lib/testutil:TestUtils",
"//third_party:guava",
"//third_party:junit4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

import static com.google.common.truth.Truth.assertThat;

import com.google.devtools.build.lib.testutil.TestThread;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -77,4 +79,49 @@ public void testResize() {
expected[expected.length - 1] = 0.42;
assertThat(values).usingTolerance(1.0e-10).containsExactly(expected).inOrder();
}

@Test
public void testParallelism() throws Exception {
// Define two threads. One is writing 1 on odd places, and another writes 2 on even places.
TimeSeries timeSeries = new TimeSeries(Duration.ZERO, Duration.ofMillis(100));
CountDownLatch latch = new CountDownLatch(2);
TestThread thread1 =
new TestThread(
() -> {
latch.countDown();
latch.await();
for (int i = 0; i < 50; i++) {
timeSeries.addRange(
Duration.ofMillis(2 * i * 100), Duration.ofMillis((2 * i + 1) * 100), 1);
}
});
TestThread thread2 =
new TestThread(
() -> {
latch.countDown();
latch.await();
for (int i = 0; i < 50; i++) {
timeSeries.addRange(
Duration.ofMillis((2 * i + 1) * 100), Duration.ofMillis((2 * i + 2) * 100), 2);
}
});
double[] expected = new double[100];
for (int i = 0; i < 100; i++) {
if (i % 2 == 0) {
expected[i] = 1;
} else {
expected[i] = 2;
}
}

thread1.start();
thread2.start();

thread1.joinAndAssertState(10000);
thread2.joinAndAssertState(10000);
assertThat(timeSeries.toDoubleArray(100))
.usingTolerance(1.0e-10)
.containsExactly(expected)
.inOrder();
}
}

0 comments on commit a448f1c

Please sign in to comment.