Skip to content

Commit

Permalink
Part of #1260: write a manually run concurrency test to tease out pro…
Browse files Browse the repository at this point in the history
…blem with LockFreePool
  • Loading branch information
cowtowncoder committed Apr 17, 2024
1 parent 33a99d3 commit 266b941
Showing 1 changed file with 159 additions and 0 deletions.
159 changes: 159 additions & 0 deletions src/test/java/perf/RecyclerPoolTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package perf;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.util.BufferRecycler;
import com.fasterxml.jackson.core.util.JsonRecyclerPools;
import com.fasterxml.jackson.core.util.RecyclerPool;

/**
* High-concurrency test that tries to see if unbounded {@link RecyclerPool}
* implementations grow without bounds or not.
*/
public class RecyclerPoolTest
{
final static int THREAD_COUNT = 100;

final static int RUNTIME_SECS = 30;

private final int _threadCount;
private final long _runtimeMsecs;

RecyclerPoolTest(int threadCount, int runtimeMinutes) {
_threadCount = threadCount;
_runtimeMsecs = TimeUnit.SECONDS.toMillis(runtimeMinutes);
}

public void testPool(JsonFactory jsonF)
throws InterruptedException
{
RecyclerPool<BufferRecycler> poolImpl = jsonF._getRecyclerPool();

final String poolName = poolImpl.getClass().getSimpleName();
final ExecutorService exec = Executors.newFixedThreadPool(_threadCount);
final AtomicLong calls = new AtomicLong();
final long startTime = System.currentTimeMillis();
final long endtimeMsecs = startTime + _runtimeMsecs;
final AtomicInteger threadsRunning = new AtomicInteger();

System.out.printf("Starting test of '%s' with %d threads, for %d seconds.\n",
poolImpl.getClass().getName(),
_threadCount, _runtimeMsecs / 1000L);

for (int i = 0; i < _threadCount; ++i) {
final int id = i;
threadsRunning.incrementAndGet();
exec.execute(new Runnable() {
@Override
public void run() {
testUntil(jsonF, endtimeMsecs, id, calls);
threadsRunning.decrementAndGet();
}
});
}

long currentTime;
long nextPrint = 0L;
// Print if exceeds expected max whenever, otherwise every 2.5 seconds
final int thresholdToPrint = _threadCount + 5;

while ((currentTime = System.currentTimeMillis()) < endtimeMsecs) {
int poolSize;

if ((poolSize = poolImpl.pooledCount()) > thresholdToPrint
|| (currentTime > nextPrint)) {
double secs = (currentTime - startTime) / 1000.0;
System.out.printf(" (%s) %.1fs, %d calls; %d threads; pool size: %d\n",
poolName, secs, calls.get(), threadsRunning.get(), poolSize);
Thread.sleep(100L);
nextPrint = currentTime + 2500L;
}
}

System.out.printf("Completed test of '%s' with %d threads running... wait termination\n",
poolImpl.getClass().getSimpleName(),
threadsRunning.get());
if (!exec.awaitTermination(2000, TimeUnit.MILLISECONDS)) {
System.out.printf("WARNING: ExecutorService.awaitTermination() failed: %d threads left; will shut down.\n",
threadsRunning.get());
exec.shutdown();
}
}

void testUntil(JsonFactory jsonF,
long endTimeMsecs, int threadId, AtomicLong calls)
{
final Random rnd = new Random(threadId);
final byte[] JSON_INPUT = "\"abc\"".getBytes(StandardCharsets.UTF_8);

while (System.currentTimeMillis() < endTimeMsecs) {
try {
// Randomize call order a bit
switch (rnd.nextInt() & 3) {
case 0:
_testRead(jsonF, JSON_INPUT);
break;
case 1:
_testWrite(jsonF);
break;
case 2:
_testRead(jsonF, JSON_INPUT);
_testWrite(jsonF);
break;
default:
_testWrite(jsonF);
_testRead(jsonF, JSON_INPUT);
break;
}
} catch (Exception e) {
System.err.printf("ERROR: thread %d fail, will exit: (%s) %s\n",
threadId, e.getClass().getName(), e.getMessage());
break;
}
calls.incrementAndGet();
}
}

private void _testRead(JsonFactory jsonF, byte[] input) throws Exception
{
JsonParser p = jsonF.createParser(new ByteArrayInputStream(input));
while (p.nextToken() != null) {
;
}
p.close();
}

private void _testWrite(JsonFactory jsonF) throws Exception
{
StringWriter w = new StringWriter(16);
JsonGenerator g = jsonF.createGenerator(w);
g.writeStartArray();
g.writeString("foobar");
g.writeEndArray();
g.close();
}

public static void main(String[] args) throws Exception
{
RecyclerPoolTest test = new RecyclerPoolTest(THREAD_COUNT, RUNTIME_SECS);
test.testPool(JsonFactory.builder()
.recyclerPool(JsonRecyclerPools.newLockFreePool())
.build());
test.testPool(JsonFactory.builder()
.recyclerPool(JsonRecyclerPools.newConcurrentDequePool())
.build());
test.testPool(JsonFactory.builder()
.recyclerPool(JsonRecyclerPools.newBoundedPool(THREAD_COUNT - 5))
.build());
}
}

0 comments on commit 266b941

Please sign in to comment.