Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decrease lock contention in SingleThreadQueueExtent #2594

Merged
merged 4 commits into from Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,6 +18,7 @@
import com.fastasyncworldedit.core.queue.IQueueChunk;
import com.fastasyncworldedit.core.queue.IQueueExtent;
import com.sk89q.worldedit.MaxChangedBlocksException;
import com.sk89q.worldedit.extent.Extent;
import com.sk89q.worldedit.extent.clipboard.Clipboard;
import com.sk89q.worldedit.function.mask.BlockMask;
import com.sk89q.worldedit.function.mask.ExistingBlockMask;
Expand Down Expand Up @@ -45,6 +46,7 @@
public class ParallelQueueExtent extends PassthroughExtent {

private static final Logger LOGGER = LogManagerCompat.getLogger();
private static final ThreadLocal<Extent> extents = new ThreadLocal<>();

private final World world;
private final QueueHandler handler;
Expand Down Expand Up @@ -73,10 +75,36 @@ public ParallelQueueExtent(QueueHandler handler, World world, boolean fastmode)
this.fastmode = fastmode;
}

/**
* Removes the extent currently associated with the calling thread.
*/
public static void clearCurrentExtent() {
extents.remove();
}

/**
* Sets the extent associated with the calling thread.
*/
public static void setCurrentExtent(Extent extent) {
extents.set(extent);
}

private void enter(Extent extent) {
setCurrentExtent(extent);
}

private void exit() {
clearCurrentExtent();
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public IQueueExtent<IQueueChunk> getExtent() {
return (IQueueExtent<IQueueChunk>) super.getExtent();
Extent extent = extents.get();
if (extent == null) {
extent = super.getExtent();
}
return (IQueueExtent<IQueueChunk>) extent;
}

@Override
Expand Down Expand Up @@ -114,6 +142,7 @@ public <T extends Filter> T apply(Region region, T filter, boolean full) {
final SingleThreadQueueExtent queue = (SingleThreadQueueExtent) getNewQueue();
queue.setFastMode(fastmode);
queue.setFaweExceptionArray(faweExceptionReasonsUsed);
enter(queue);
synchronized (queue) {
try {
ChunkFilterBlock block = null;
Expand Down Expand Up @@ -154,6 +183,8 @@ public <T extends Filter> T apply(Region region, T filter, boolean full) {
exceptionCount++;
LOGGER.warn(message);
}
} finally {
exit();
}
})).toArray(ForkJoinTask[]::new);
// Join filters
Expand Down
Expand Up @@ -408,7 +408,7 @@ public IQueueExtent<IQueueChunk> create() {
* Sets the current thread's {@link IQueueExtent} instance in the queue pool to null.
*/
public void unCache() {
queuePool.set(null);
queuePool.remove();
}

private IQueueExtent<IQueueChunk> pool() {
Expand Down
Expand Up @@ -9,7 +9,6 @@
import com.fastasyncworldedit.core.extent.processor.ExtentBatchProcessorHolder;
import com.fastasyncworldedit.core.extent.processor.ProcessorScope;
import com.fastasyncworldedit.core.internal.exception.FaweException;
import com.fastasyncworldedit.core.queue.IChunk;
import com.fastasyncworldedit.core.queue.IChunkCache;
import com.fastasyncworldedit.core.queue.IChunkGet;
import com.fastasyncworldedit.core.queue.IChunkSet;
Expand Down Expand Up @@ -48,11 +47,9 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen

private static final Logger LOGGER = LogManagerCompat.getLogger();

// Pool discarded chunks for reuse (can safely be cleared by another thread)
// private static final ConcurrentLinkedQueue<IChunk> CHUNK_POOL = new ConcurrentLinkedQueue<>();
// Chunks currently being queued / worked on
private final Long2ObjectLinkedOpenHashMap<IQueueChunk> chunks = new Long2ObjectLinkedOpenHashMap<>();
private final ConcurrentLinkedQueue<Future> submissions = new ConcurrentLinkedQueue<>();
private final Long2ObjectLinkedOpenHashMap<IQueueChunk<?>> chunks = new Long2ObjectLinkedOpenHashMap<>();
private final ConcurrentLinkedQueue<Future<?>> submissions = new ConcurrentLinkedQueue<>();
private final ReentrantLock getChunkLock = new ReentrantLock();
private World world = null;
private int minY = 0;
Expand Down Expand Up @@ -142,12 +139,10 @@ protected synchronized void reset() {
if (!this.initialized) {
return;
}
if (!this.chunks.isEmpty()) {
getChunkLock.lock();
for (IChunk chunk : this.chunks.values()) {
chunk.recycle();
}
getChunkLock.lock();
try {
this.chunks.clear();
} finally {
getChunkLock.unlock();
}
this.enabledQueue = true;
Expand Down Expand Up @@ -234,7 +229,6 @@ private <V extends Future<V>> V submitUnchecked(IQueueChunk chunk) {
}
}
if (chunk.isEmpty()) {
chunk.recycle();
Future result = Futures.immediateFuture(null);
return (V) result;
}
Expand Down
@@ -1,7 +1,5 @@
package com.fastasyncworldedit.core.queue.implementation.chunk;

import com.fastasyncworldedit.core.FaweCache;
import com.fastasyncworldedit.core.configuration.Settings;
import com.fastasyncworldedit.core.extent.filter.block.ChunkFilterBlock;
import com.fastasyncworldedit.core.extent.processor.EmptyBatchProcessor;
import com.fastasyncworldedit.core.extent.processor.heightmap.HeightMapType;
Expand All @@ -11,36 +9,34 @@
import com.fastasyncworldedit.core.queue.IChunkSet;
import com.fastasyncworldedit.core.queue.IQueueChunk;
import com.fastasyncworldedit.core.queue.IQueueExtent;
import com.fastasyncworldedit.core.queue.Pool;
import com.fastasyncworldedit.core.queue.implementation.ParallelQueueExtent;
import com.fastasyncworldedit.core.util.MemUtil;
import com.sk89q.jnbt.CompoundTag;
import com.sk89q.worldedit.internal.util.LogManagerCompat;
import com.sk89q.worldedit.math.BlockVector3;
import com.sk89q.worldedit.regions.Region;
import com.sk89q.worldedit.world.biome.BiomeType;
import com.sk89q.worldedit.world.block.BaseBlock;
import com.sk89q.worldedit.world.block.BlockState;
import com.sk89q.worldedit.world.block.BlockStateHolder;
import org.apache.logging.log4j.Logger;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* An abstract {@link IChunk} class that implements basic get/set blocks.
*/
@SuppressWarnings("rawtypes")
public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {

private static final Pool<ChunkHolder> POOL = FaweCache.INSTANCE.registerPool(
ChunkHolder.class,
ChunkHolder::new,
Settings.settings().QUEUE.POOL
);
private static final Logger LOGGER = LogManagerCompat.getLogger();

public static ChunkHolder newInstance() {
return POOL.poll();
return new ChunkHolder();
}

private volatile IChunkGet chunkExisting; // The existing chunk (e.g. a clipboard, or the world, before changes)
Expand All @@ -63,16 +59,12 @@ public void init(IBlockDelegate delegate) {
this.delegate = delegate;
}

private static final AtomicBoolean recycleWarning = new AtomicBoolean(false);
@Override
public synchronized void recycle() {
delegate = NULL;
if (chunkSet != null) {
chunkSet.recycle();
chunkSet = null;
public void recycle() {
if (!recycleWarning.getAndSet(true)) {
LOGGER.warn("ChunkHolder should not be recycled.", new Exception());
}
chunkExisting = null;
extent = null;
POOL.offer(this);
}

public long initAge() {
Expand Down Expand Up @@ -1018,7 +1010,6 @@ public synchronized T call() {
// Do nothing
});
}
recycle();
return null;
}

Expand All @@ -1031,6 +1022,7 @@ public synchronized T call(IChunkSet set, Runnable finalize) {
IChunkGet get = getOrCreateGet();
try {
get.lockCall();
trackExtent();
boolean postProcess = !(getExtent().getPostProcessor() instanceof EmptyBatchProcessor);
final int copyKey = get.setCreateCopy(postProcess);
final IChunkSet iChunkSet = getExtent().processSet(this, get, set);
Expand All @@ -1046,11 +1038,24 @@ public synchronized T call(IChunkSet set, Runnable finalize) {
return get.call(set, finalizer);
} finally {
get.unlockCall();
untrackExtent();
}
}
return null;
}

// "call" can be called by QueueHandler#blockingExecutor. In such case, we still want the other thread
// to use this SingleThreadQueueExtent. Otherwise, many threads might end up locking on **one** STQE.
// This way, locking is spread across multiple STQEs, allowing for better performance

private void trackExtent() {
ParallelQueueExtent.setCurrentExtent(extent);
}

private void untrackExtent() {
ParallelQueueExtent.clearCurrentExtent();
}

/**
* Get the extent this chunk is in.
*/
Expand Down
Expand Up @@ -32,6 +32,7 @@
import com.fastasyncworldedit.core.limit.FaweLimit;
import com.fastasyncworldedit.core.util.BrushCache;
import com.fastasyncworldedit.core.util.MainUtil;
import com.fastasyncworldedit.core.util.MaskTraverser;
import com.fastasyncworldedit.core.util.StringMan;
import com.fastasyncworldedit.core.util.TaskManager;
import com.fastasyncworldedit.core.util.TextureHolder;
Expand All @@ -53,6 +54,7 @@
import com.sk89q.worldedit.entity.Player;
import com.sk89q.worldedit.extension.platform.Actor;
import com.sk89q.worldedit.extension.platform.Locatable;
import com.sk89q.worldedit.extent.NullExtent;
import com.sk89q.worldedit.extent.clipboard.BlockArrayClipboard;
import com.sk89q.worldedit.extent.clipboard.Clipboard;
import com.sk89q.worldedit.extent.inventory.BlockBag;
Expand Down Expand Up @@ -594,6 +596,9 @@ public void remember(EditSession editSession, boolean append, int limitMb) {
long size = MainUtil.getSize(item);
historySize -= size;
}
// free the mask from any remaining references to e.g. extents
// if used again
new MaskTraverser(mask).reset(NullExtent.INSTANCE);
} finally {
historyWriteLock.unlock();
}
Expand Down