Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decrease lock contention in SingleThreadQueueExtent #2594

Merged
merged 4 commits into from Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -42,7 +42,7 @@
import java.util.concurrent.ForkJoinTask;
import java.util.stream.IntStream;

public class ParallelQueueExtent extends PassthroughExtent {
public class ParallelQueueExtent extends ThreadLocalPassthroughExtent {

private static final Logger LOGGER = LogManagerCompat.getLogger();

Expand Down Expand Up @@ -114,6 +114,7 @@ public <T extends Filter> T apply(Region region, T filter, boolean full) {
final SingleThreadQueueExtent queue = (SingleThreadQueueExtent) getNewQueue();
queue.setFastMode(fastmode);
queue.setFaweExceptionArray(faweExceptionReasonsUsed);
enter(queue);
synchronized (queue) {
try {
ChunkFilterBlock block = null;
Expand Down Expand Up @@ -154,6 +155,8 @@ public <T extends Filter> T apply(Region region, T filter, boolean full) {
exceptionCount++;
LOGGER.warn(message);
}
} finally {
exit();
}
})).toArray(ForkJoinTask[]::new);
// Join filters
Expand Down
Expand Up @@ -408,7 +408,7 @@ public IQueueExtent<IQueueChunk> create() {
* Sets the current thread's {@link IQueueExtent} instance in the queue pool to null.
*/
public void unCache() {
queuePool.set(null);
queuePool.remove();
}

private IQueueExtent<IQueueChunk> pool() {
Expand Down
Expand Up @@ -9,7 +9,6 @@
import com.fastasyncworldedit.core.extent.processor.ExtentBatchProcessorHolder;
import com.fastasyncworldedit.core.extent.processor.ProcessorScope;
import com.fastasyncworldedit.core.internal.exception.FaweException;
import com.fastasyncworldedit.core.queue.IChunk;
import com.fastasyncworldedit.core.queue.IChunkCache;
import com.fastasyncworldedit.core.queue.IChunkGet;
import com.fastasyncworldedit.core.queue.IChunkSet;
Expand Down Expand Up @@ -48,11 +47,9 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen

private static final Logger LOGGER = LogManagerCompat.getLogger();

// Pool discarded chunks for reuse (can safely be cleared by another thread)
// private static final ConcurrentLinkedQueue<IChunk> CHUNK_POOL = new ConcurrentLinkedQueue<>();
// Chunks currently being queued / worked on
private final Long2ObjectLinkedOpenHashMap<IQueueChunk> chunks = new Long2ObjectLinkedOpenHashMap<>();
private final ConcurrentLinkedQueue<Future> submissions = new ConcurrentLinkedQueue<>();
private final Long2ObjectLinkedOpenHashMap<IQueueChunk<?>> chunks = new Long2ObjectLinkedOpenHashMap<>();
private final ConcurrentLinkedQueue<Future<?>> submissions = new ConcurrentLinkedQueue<>();
private final ReentrantLock getChunkLock = new ReentrantLock();
private World world = null;
private int minY = 0;
Expand Down Expand Up @@ -142,12 +139,10 @@ protected synchronized void reset() {
if (!this.initialized) {
return;
}
if (!this.chunks.isEmpty()) {
getChunkLock.lock();
for (IChunk chunk : this.chunks.values()) {
chunk.recycle();
}
getChunkLock.lock();
try {
this.chunks.clear();
} finally {
getChunkLock.unlock();
}
this.enabledQueue = true;
Expand Down Expand Up @@ -234,7 +229,6 @@ private <V extends Future<V>> V submitUnchecked(IQueueChunk chunk) {
}
}
if (chunk.isEmpty()) {
chunk.recycle();
Future result = Futures.immediateFuture(null);
return (V) result;
}
Expand Down
@@ -0,0 +1,65 @@
package com.fastasyncworldedit.core.queue.implementation;

import com.fastasyncworldedit.core.extent.PassthroughExtent;
import com.sk89q.worldedit.extent.Extent;
import org.jetbrains.annotations.ApiStatus;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* This extent maintains a mapping from Threads to Extents.
* Whenever an implementation calls {@link #getExtent()}, it will get the extent
* associated with the current thread, or the one given by the super class if no mapping
* for this thread exist.
* <p>
* There are two ways how to establish a mapping:
* <ol>
* <il>by calling {@link #enter(Extent)}.
* This should be called paired with {@link #exit()} to clear the mapping again.</il>
* <il>by calling {@link #setCurrentExtent(Extent)}.
* This should be called paired with {@link #clearCurrent()}</il>
* </ol>
*
* The first can be used when calling it in the context of a {@link ThreadLocalPassthroughExtent}.
* The static methods can be called from everywhere, but this requires extra attention to make sure no
* wrong mapping is kept.
*
* @since TODO
*/
@ApiStatus.Internal
public class ThreadLocalPassthroughExtent extends PassthroughExtent {

private static final ConcurrentMap<Thread, Extent> extents = new ConcurrentHashMap<>();

/**
* Create a new instance.
*
* @param extent the extent
*/
public ThreadLocalPassthroughExtent(final Extent extent) {
super(extent);
}

public static void clearCurrent() {
extents.remove(Thread.currentThread());
}

public static void setCurrentExtent(Extent extent) {
extents.put(Thread.currentThread(), extent);
}

public void enter(Extent extent) {
extents.put(Thread.currentThread(), extent);
}

public void exit() {
extents.remove(Thread.currentThread());
}

@Override
public Extent getExtent() {
return extents.getOrDefault(Thread.currentThread(), super.getExtent());
}

}
@@ -1,7 +1,5 @@
package com.fastasyncworldedit.core.queue.implementation.chunk;

import com.fastasyncworldedit.core.FaweCache;
import com.fastasyncworldedit.core.configuration.Settings;
import com.fastasyncworldedit.core.extent.filter.block.ChunkFilterBlock;
import com.fastasyncworldedit.core.extent.processor.EmptyBatchProcessor;
import com.fastasyncworldedit.core.extent.processor.heightmap.HeightMapType;
Expand All @@ -11,36 +9,34 @@
import com.fastasyncworldedit.core.queue.IChunkSet;
import com.fastasyncworldedit.core.queue.IQueueChunk;
import com.fastasyncworldedit.core.queue.IQueueExtent;
import com.fastasyncworldedit.core.queue.Pool;
import com.fastasyncworldedit.core.queue.implementation.ThreadLocalPassthroughExtent;
import com.fastasyncworldedit.core.util.MemUtil;
import com.sk89q.jnbt.CompoundTag;
import com.sk89q.worldedit.internal.util.LogManagerCompat;
import com.sk89q.worldedit.math.BlockVector3;
import com.sk89q.worldedit.regions.Region;
import com.sk89q.worldedit.world.biome.BiomeType;
import com.sk89q.worldedit.world.block.BaseBlock;
import com.sk89q.worldedit.world.block.BlockState;
import com.sk89q.worldedit.world.block.BlockStateHolder;
import org.apache.logging.log4j.Logger;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* An abstract {@link IChunk} class that implements basic get/set blocks.
*/
@SuppressWarnings("rawtypes")
public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {

private static final Pool<ChunkHolder> POOL = FaweCache.INSTANCE.registerPool(
ChunkHolder.class,
ChunkHolder::new,
Settings.settings().QUEUE.POOL
);
private static final Logger LOGGER = LogManagerCompat.getLogger();

public static ChunkHolder newInstance() {
return POOL.poll();
return new ChunkHolder();
}

private volatile IChunkGet chunkExisting; // The existing chunk (e.g. a clipboard, or the world, before changes)
Expand All @@ -63,16 +59,12 @@ public void init(IBlockDelegate delegate) {
this.delegate = delegate;
}

private static final AtomicBoolean recycleWarning = new AtomicBoolean(false);
@Override
public synchronized void recycle() {
delegate = NULL;
if (chunkSet != null) {
chunkSet.recycle();
chunkSet = null;
public void recycle() {
if (!recycleWarning.getAndSet(true)) {
LOGGER.warn("ChunkHolder should not be recycled.", new Exception());
}
chunkExisting = null;
extent = null;
POOL.offer(this);
}

public long initAge() {
Expand Down Expand Up @@ -1018,7 +1010,6 @@ public synchronized T call() {
// Do nothing
});
}
recycle();
return null;
}

Expand All @@ -1031,6 +1022,7 @@ public synchronized T call(IChunkSet set, Runnable finalize) {
IChunkGet get = getOrCreateGet();
try {
get.lockCall();
trackExtent();
boolean postProcess = !(getExtent().getPostProcessor() instanceof EmptyBatchProcessor);
final int copyKey = get.setCreateCopy(postProcess);
final IChunkSet iChunkSet = getExtent().processSet(this, get, set);
Expand All @@ -1046,11 +1038,24 @@ public synchronized T call(IChunkSet set, Runnable finalize) {
return get.call(set, finalizer);
} finally {
get.unlockCall();
untrackExtent();
}
}
return null;
}

// "call" can be called by QueueHandler#blockingExecutor. In such case, we still want the other thread
// to use this SingleThreadQueueExtent. Otherwise, many threads might end up locking on **one** STQE.
// This way, locking is spread across multiple STQEs, allowing for better performance

private void trackExtent() {
ThreadLocalPassthroughExtent.setCurrentExtent(extent);
}

private void untrackExtent() {
ThreadLocalPassthroughExtent.clearCurrent();
}

/**
* Get the extent this chunk is in.
*/
Expand Down
Expand Up @@ -32,6 +32,7 @@
import com.fastasyncworldedit.core.limit.FaweLimit;
import com.fastasyncworldedit.core.util.BrushCache;
import com.fastasyncworldedit.core.util.MainUtil;
import com.fastasyncworldedit.core.util.MaskTraverser;
import com.fastasyncworldedit.core.util.StringMan;
import com.fastasyncworldedit.core.util.TaskManager;
import com.fastasyncworldedit.core.util.TextureHolder;
Expand All @@ -53,6 +54,7 @@
import com.sk89q.worldedit.entity.Player;
import com.sk89q.worldedit.extension.platform.Actor;
import com.sk89q.worldedit.extension.platform.Locatable;
import com.sk89q.worldedit.extent.NullExtent;
import com.sk89q.worldedit.extent.clipboard.BlockArrayClipboard;
import com.sk89q.worldedit.extent.clipboard.Clipboard;
import com.sk89q.worldedit.extent.inventory.BlockBag;
Expand Down Expand Up @@ -594,6 +596,9 @@ public void remember(EditSession editSession, boolean append, int limitMb) {
long size = MainUtil.getSize(item);
historySize -= size;
}
// free the mask from any remaining references to e.g. extents
// if used again
new MaskTraverser(mask).reset(NullExtent.INSTANCE);
} finally {
historyWriteLock.unlock();
}
Expand Down