Skip to content

Commit

Permalink
Decrease lock contention in SingleThreadQueueExtent (#2594)
Browse files Browse the repository at this point in the history
* thread local extent

* avoid race conditions due to ChunkHolder pooling

* clean up JFR events, javadoc

* remove ThreadLocalPassthroughExtent
  • Loading branch information
SirYwell committed Mar 4, 2024
1 parent b6d691d commit 1642713
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 32 deletions.
Expand Up @@ -18,6 +18,7 @@
import com.fastasyncworldedit.core.queue.IQueueChunk;
import com.fastasyncworldedit.core.queue.IQueueExtent;
import com.sk89q.worldedit.MaxChangedBlocksException;
import com.sk89q.worldedit.extent.Extent;
import com.sk89q.worldedit.extent.clipboard.Clipboard;
import com.sk89q.worldedit.function.mask.BlockMask;
import com.sk89q.worldedit.function.mask.ExistingBlockMask;
Expand Down Expand Up @@ -45,6 +46,7 @@
public class ParallelQueueExtent extends PassthroughExtent {

private static final Logger LOGGER = LogManagerCompat.getLogger();
private static final ThreadLocal<Extent> extents = new ThreadLocal<>();

private final World world;
private final QueueHandler handler;
Expand Down Expand Up @@ -73,10 +75,36 @@ public ParallelQueueExtent(QueueHandler handler, World world, boolean fastmode)
this.fastmode = fastmode;
}

/**
* Removes the extent currently associated with the calling thread.
*/
public static void clearCurrentExtent() {
extents.remove();
}

/**
* Sets the extent associated with the calling thread.
*/
public static void setCurrentExtent(Extent extent) {
extents.set(extent);
}

private void enter(Extent extent) {
setCurrentExtent(extent);
}

private void exit() {
clearCurrentExtent();
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public IQueueExtent<IQueueChunk> getExtent() {
return (IQueueExtent<IQueueChunk>) super.getExtent();
Extent extent = extents.get();
if (extent == null) {
extent = super.getExtent();
}
return (IQueueExtent<IQueueChunk>) extent;
}

@Override
Expand Down Expand Up @@ -114,6 +142,7 @@ public <T extends Filter> T apply(Region region, T filter, boolean full) {
final SingleThreadQueueExtent queue = (SingleThreadQueueExtent) getNewQueue();
queue.setFastMode(fastmode);
queue.setFaweExceptionArray(faweExceptionReasonsUsed);
enter(queue);
synchronized (queue) {
try {
ChunkFilterBlock block = null;
Expand Down Expand Up @@ -154,6 +183,8 @@ public <T extends Filter> T apply(Region region, T filter, boolean full) {
exceptionCount++;
LOGGER.warn(message);
}
} finally {
exit();
}
})).toArray(ForkJoinTask[]::new);
// Join filters
Expand Down
Expand Up @@ -408,7 +408,7 @@ public IQueueExtent<IQueueChunk> create() {
* Sets the current thread's {@link IQueueExtent} instance in the queue pool to null.
*/
public void unCache() {
queuePool.set(null);
queuePool.remove();
}

private IQueueExtent<IQueueChunk> pool() {
Expand Down
Expand Up @@ -9,7 +9,6 @@
import com.fastasyncworldedit.core.extent.processor.ExtentBatchProcessorHolder;
import com.fastasyncworldedit.core.extent.processor.ProcessorScope;
import com.fastasyncworldedit.core.internal.exception.FaweException;
import com.fastasyncworldedit.core.queue.IChunk;
import com.fastasyncworldedit.core.queue.IChunkCache;
import com.fastasyncworldedit.core.queue.IChunkGet;
import com.fastasyncworldedit.core.queue.IChunkSet;
Expand Down Expand Up @@ -48,11 +47,9 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen

private static final Logger LOGGER = LogManagerCompat.getLogger();

// Pool discarded chunks for reuse (can safely be cleared by another thread)
// private static final ConcurrentLinkedQueue<IChunk> CHUNK_POOL = new ConcurrentLinkedQueue<>();
// Chunks currently being queued / worked on
private final Long2ObjectLinkedOpenHashMap<IQueueChunk> chunks = new Long2ObjectLinkedOpenHashMap<>();
private final ConcurrentLinkedQueue<Future> submissions = new ConcurrentLinkedQueue<>();
private final Long2ObjectLinkedOpenHashMap<IQueueChunk<?>> chunks = new Long2ObjectLinkedOpenHashMap<>();
private final ConcurrentLinkedQueue<Future<?>> submissions = new ConcurrentLinkedQueue<>();
private final ReentrantLock getChunkLock = new ReentrantLock();
private World world = null;
private int minY = 0;
Expand Down Expand Up @@ -142,12 +139,10 @@ protected synchronized void reset() {
if (!this.initialized) {
return;
}
if (!this.chunks.isEmpty()) {
getChunkLock.lock();
for (IChunk chunk : this.chunks.values()) {
chunk.recycle();
}
getChunkLock.lock();
try {
this.chunks.clear();
} finally {
getChunkLock.unlock();
}
this.enabledQueue = true;
Expand Down Expand Up @@ -234,7 +229,6 @@ private <V extends Future<V>> V submitUnchecked(IQueueChunk chunk) {
}
}
if (chunk.isEmpty()) {
chunk.recycle();
Future result = Futures.immediateFuture(null);
return (V) result;
}
Expand Down
@@ -1,7 +1,5 @@
package com.fastasyncworldedit.core.queue.implementation.chunk;

import com.fastasyncworldedit.core.FaweCache;
import com.fastasyncworldedit.core.configuration.Settings;
import com.fastasyncworldedit.core.extent.filter.block.ChunkFilterBlock;
import com.fastasyncworldedit.core.extent.processor.EmptyBatchProcessor;
import com.fastasyncworldedit.core.extent.processor.heightmap.HeightMapType;
Expand All @@ -11,36 +9,34 @@
import com.fastasyncworldedit.core.queue.IChunkSet;
import com.fastasyncworldedit.core.queue.IQueueChunk;
import com.fastasyncworldedit.core.queue.IQueueExtent;
import com.fastasyncworldedit.core.queue.Pool;
import com.fastasyncworldedit.core.queue.implementation.ParallelQueueExtent;
import com.fastasyncworldedit.core.util.MemUtil;
import com.sk89q.jnbt.CompoundTag;
import com.sk89q.worldedit.internal.util.LogManagerCompat;
import com.sk89q.worldedit.math.BlockVector3;
import com.sk89q.worldedit.regions.Region;
import com.sk89q.worldedit.world.biome.BiomeType;
import com.sk89q.worldedit.world.block.BaseBlock;
import com.sk89q.worldedit.world.block.BlockState;
import com.sk89q.worldedit.world.block.BlockStateHolder;
import org.apache.logging.log4j.Logger;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* An abstract {@link IChunk} class that implements basic get/set blocks.
*/
@SuppressWarnings("rawtypes")
public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {

private static final Pool<ChunkHolder> POOL = FaweCache.INSTANCE.registerPool(
ChunkHolder.class,
ChunkHolder::new,
Settings.settings().QUEUE.POOL
);
private static final Logger LOGGER = LogManagerCompat.getLogger();

public static ChunkHolder newInstance() {
return POOL.poll();
return new ChunkHolder();
}

private volatile IChunkGet chunkExisting; // The existing chunk (e.g. a clipboard, or the world, before changes)
Expand All @@ -63,16 +59,12 @@ public void init(IBlockDelegate delegate) {
this.delegate = delegate;
}

private static final AtomicBoolean recycleWarning = new AtomicBoolean(false);
@Override
public synchronized void recycle() {
delegate = NULL;
if (chunkSet != null) {
chunkSet.recycle();
chunkSet = null;
public void recycle() {
if (!recycleWarning.getAndSet(true)) {
LOGGER.warn("ChunkHolder should not be recycled.", new Exception());
}
chunkExisting = null;
extent = null;
POOL.offer(this);
}

public long initAge() {
Expand Down Expand Up @@ -1018,7 +1010,6 @@ public synchronized T call() {
// Do nothing
});
}
recycle();
return null;
}

Expand All @@ -1031,6 +1022,7 @@ public synchronized T call(IChunkSet set, Runnable finalize) {
IChunkGet get = getOrCreateGet();
try {
get.lockCall();
trackExtent();
boolean postProcess = !(getExtent().getPostProcessor() instanceof EmptyBatchProcessor);
final int copyKey = get.setCreateCopy(postProcess);
final IChunkSet iChunkSet = getExtent().processSet(this, get, set);
Expand All @@ -1046,11 +1038,24 @@ public synchronized T call(IChunkSet set, Runnable finalize) {
return get.call(set, finalizer);
} finally {
get.unlockCall();
untrackExtent();
}
}
return null;
}

// "call" can be called by QueueHandler#blockingExecutor. In such case, we still want the other thread
// to use this SingleThreadQueueExtent. Otherwise, many threads might end up locking on **one** STQE.
// This way, locking is spread across multiple STQEs, allowing for better performance

private void trackExtent() {
ParallelQueueExtent.setCurrentExtent(extent);
}

private void untrackExtent() {
ParallelQueueExtent.clearCurrentExtent();
}

/**
* Get the extent this chunk is in.
*/
Expand Down
Expand Up @@ -32,6 +32,7 @@
import com.fastasyncworldedit.core.limit.FaweLimit;
import com.fastasyncworldedit.core.util.BrushCache;
import com.fastasyncworldedit.core.util.MainUtil;
import com.fastasyncworldedit.core.util.MaskTraverser;
import com.fastasyncworldedit.core.util.StringMan;
import com.fastasyncworldedit.core.util.TaskManager;
import com.fastasyncworldedit.core.util.TextureHolder;
Expand All @@ -53,6 +54,7 @@
import com.sk89q.worldedit.entity.Player;
import com.sk89q.worldedit.extension.platform.Actor;
import com.sk89q.worldedit.extension.platform.Locatable;
import com.sk89q.worldedit.extent.NullExtent;
import com.sk89q.worldedit.extent.clipboard.BlockArrayClipboard;
import com.sk89q.worldedit.extent.clipboard.Clipboard;
import com.sk89q.worldedit.extent.inventory.BlockBag;
Expand Down Expand Up @@ -594,6 +596,9 @@ public void remember(EditSession editSession, boolean append, int limitMb) {
long size = MainUtil.getSize(item);
historySize -= size;
}
// free the mask from any remaining references to e.g. extents
// if used again
new MaskTraverser(mask).reset(NullExtent.INSTANCE);
} finally {
historyWriteLock.unlock();
}
Expand Down

0 comments on commit 1642713

Please sign in to comment.