Skip to content

Commit

Permalink
Ensure CancellableQueue doesn't keep references to logically removed …
Browse files Browse the repository at this point in the history
…objects. (#27403) (#27419)

Co-authored-by: scwhittle <scwhittle@users.noreply.github.com>
  • Loading branch information
Abacn and scwhittle committed Jul 10, 2023
1 parent ff924a0 commit 6ce85e5
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A simplified {@link ThreadSafe} blocking queue that can be cancelled freeing any blocked {@link
Expand All @@ -36,7 +36,7 @@
public class CancellableQueue<T extends @NonNull Object> {

private final int capacity;
private final Object[] elements;
private final @Nullable Object[] elements;
private final Lock lock;
private final Condition notFull;
private final Condition notEmpty;
Expand Down Expand Up @@ -86,8 +86,9 @@ public void put(T t) throws Exception, InterruptedException {
* must invoke {@link #cancel} if the interrupt is unrecoverable.
* @throws Exception if the queue is cancelled.
*/
@SuppressWarnings({"cast"})
public T take() throws Exception, InterruptedException {
Object rval;
T rval;
try {
lock.lockInterruptibly();
while (count == 0 && cancellationException == null) {
Expand All @@ -97,14 +98,15 @@ public T take() throws Exception, InterruptedException {
throw cancellationException;
}

rval = elements[takeIndex];
rval = (T) elements[takeIndex];
elements[takeIndex] = null;
takeIndex = (takeIndex + 1) % elements.length;
count -= 1;
notFull.signal();
} finally {
lock.unlock();
}
return (T) rval;
return rval;
}

/**
Expand All @@ -119,6 +121,7 @@ public void cancel(Exception exception) {
try {
if (cancellationException == null) {
cancellationException = exception;
clearElementsLocked();
}
notEmpty.signalAll();
notFull.signalAll();
Expand All @@ -127,14 +130,21 @@ public void cancel(Exception exception) {
}
}

private void clearElementsLocked() {
for (int i = takeIndex; count > 0; i = (i + 1) % elements.length) {
elements[i] = null;
--count;
}
addIndex = 0;
takeIndex = 0;
}

/** Enables the queue to be re-used after it has been cancelled. */
public void reset() {
lock.lock();
try {
cancellationException = null;
addIndex = 0;
takeIndex = 0;
count = 0;
clearElementsLocked();
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import static org.hamcrest.Matchers.empty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -219,4 +221,30 @@ public void testFirstCancellationError() throws Exception {
queue.cancel(new RuntimeException("Second cancel exception"));
assertThrows("First cancel exception", RuntimeException.class, () -> queue.take());
}

@Test
public void testMemoryReferenceOnTake() throws Exception {
String s1 = new String("test1");
String s2 = new String("test2");
WeakReference<String> weakReference1 = new WeakReference<>(s1);
WeakReference<String> weakReference2 = new WeakReference<>(s2);
CancellableQueue<String> queue = new CancellableQueue<>(100);
queue.put(s1);
queue.put(s2);
s1 = null;
s2 = null;
System.gc();
assertTrue(weakReference1.get() != null);
assertTrue(weakReference2.get() != null);

assertEquals("test1", queue.take());
System.gc();
assertTrue(weakReference1.get() == null);
assertTrue(weakReference2.get() != null);

queue.reset();
System.gc();
assertTrue(weakReference1.get() == null);
assertTrue(weakReference2.get() == null);
}
}

0 comments on commit 6ce85e5

Please sign in to comment.