diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/ParallelQueueExtent.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/ParallelQueueExtent.java index 2d2b45aae9..52e6cf2c68 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/ParallelQueueExtent.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/ParallelQueueExtent.java @@ -18,6 +18,7 @@ import com.fastasyncworldedit.core.queue.IQueueChunk; import com.fastasyncworldedit.core.queue.IQueueExtent; import com.sk89q.worldedit.MaxChangedBlocksException; +import com.sk89q.worldedit.extent.Extent; import com.sk89q.worldedit.extent.clipboard.Clipboard; import com.sk89q.worldedit.function.mask.BlockMask; import com.sk89q.worldedit.function.mask.ExistingBlockMask; @@ -45,6 +46,7 @@ public class ParallelQueueExtent extends PassthroughExtent { private static final Logger LOGGER = LogManagerCompat.getLogger(); + private static final ThreadLocal extents = new ThreadLocal<>(); private final World world; private final QueueHandler handler; @@ -73,10 +75,36 @@ public ParallelQueueExtent(QueueHandler handler, World world, boolean fastmode) this.fastmode = fastmode; } + /** + * Removes the extent currently associated with the calling thread. + */ + public static void clearCurrentExtent() { + extents.remove(); + } + + /** + * Sets the extent associated with the calling thread. + */ + public static void setCurrentExtent(Extent extent) { + extents.set(extent); + } + + private void enter(Extent extent) { + setCurrentExtent(extent); + } + + private void exit() { + clearCurrentExtent(); + } + @Override @SuppressWarnings({"unchecked", "rawtypes"}) public IQueueExtent getExtent() { - return (IQueueExtent) super.getExtent(); + Extent extent = extents.get(); + if (extent == null) { + extent = super.getExtent(); + } + return (IQueueExtent) extent; } @Override @@ -114,6 +142,7 @@ public T apply(Region region, T filter, boolean full) { final SingleThreadQueueExtent queue = (SingleThreadQueueExtent) getNewQueue(); queue.setFastMode(fastmode); queue.setFaweExceptionArray(faweExceptionReasonsUsed); + enter(queue); synchronized (queue) { try { ChunkFilterBlock block = null; @@ -154,6 +183,8 @@ public T apply(Region region, T filter, boolean full) { exceptionCount++; LOGGER.warn(message); } + } finally { + exit(); } })).toArray(ForkJoinTask[]::new); // Join filters diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java index 956c33fb2a..7bdbf46455 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java @@ -408,7 +408,7 @@ public IQueueExtent create() { * Sets the current thread's {@link IQueueExtent} instance in the queue pool to null. */ public void unCache() { - queuePool.set(null); + queuePool.remove(); } private IQueueExtent pool() { diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/SingleThreadQueueExtent.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/SingleThreadQueueExtent.java index 132229d1de..6e06ce3481 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/SingleThreadQueueExtent.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/SingleThreadQueueExtent.java @@ -9,7 +9,6 @@ import com.fastasyncworldedit.core.extent.processor.ExtentBatchProcessorHolder; import com.fastasyncworldedit.core.extent.processor.ProcessorScope; import com.fastasyncworldedit.core.internal.exception.FaweException; -import com.fastasyncworldedit.core.queue.IChunk; import com.fastasyncworldedit.core.queue.IChunkCache; import com.fastasyncworldedit.core.queue.IChunkGet; import com.fastasyncworldedit.core.queue.IChunkSet; @@ -48,11 +47,9 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen private static final Logger LOGGER = LogManagerCompat.getLogger(); - // Pool discarded chunks for reuse (can safely be cleared by another thread) - // private static final ConcurrentLinkedQueue CHUNK_POOL = new ConcurrentLinkedQueue<>(); // Chunks currently being queued / worked on - private final Long2ObjectLinkedOpenHashMap chunks = new Long2ObjectLinkedOpenHashMap<>(); - private final ConcurrentLinkedQueue submissions = new ConcurrentLinkedQueue<>(); + private final Long2ObjectLinkedOpenHashMap> chunks = new Long2ObjectLinkedOpenHashMap<>(); + private final ConcurrentLinkedQueue> submissions = new ConcurrentLinkedQueue<>(); private final ReentrantLock getChunkLock = new ReentrantLock(); private World world = null; private int minY = 0; @@ -142,12 +139,10 @@ protected synchronized void reset() { if (!this.initialized) { return; } - if (!this.chunks.isEmpty()) { - getChunkLock.lock(); - for (IChunk chunk : this.chunks.values()) { - chunk.recycle(); - } + getChunkLock.lock(); + try { this.chunks.clear(); + } finally { getChunkLock.unlock(); } this.enabledQueue = true; @@ -234,7 +229,6 @@ private > V submitUnchecked(IQueueChunk chunk) { } } if (chunk.isEmpty()) { - chunk.recycle(); Future result = Futures.immediateFuture(null); return (V) result; } diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/chunk/ChunkHolder.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/chunk/ChunkHolder.java index 6103a1649c..a7417eddf5 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/chunk/ChunkHolder.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/chunk/ChunkHolder.java @@ -1,7 +1,5 @@ package com.fastasyncworldedit.core.queue.implementation.chunk; -import com.fastasyncworldedit.core.FaweCache; -import com.fastasyncworldedit.core.configuration.Settings; import com.fastasyncworldedit.core.extent.filter.block.ChunkFilterBlock; import com.fastasyncworldedit.core.extent.processor.EmptyBatchProcessor; import com.fastasyncworldedit.core.extent.processor.heightmap.HeightMapType; @@ -11,36 +9,34 @@ import com.fastasyncworldedit.core.queue.IChunkSet; import com.fastasyncworldedit.core.queue.IQueueChunk; import com.fastasyncworldedit.core.queue.IQueueExtent; -import com.fastasyncworldedit.core.queue.Pool; +import com.fastasyncworldedit.core.queue.implementation.ParallelQueueExtent; import com.fastasyncworldedit.core.util.MemUtil; import com.sk89q.jnbt.CompoundTag; +import com.sk89q.worldedit.internal.util.LogManagerCompat; import com.sk89q.worldedit.math.BlockVector3; import com.sk89q.worldedit.regions.Region; import com.sk89q.worldedit.world.biome.BiomeType; import com.sk89q.worldedit.world.block.BaseBlock; import com.sk89q.worldedit.world.block.BlockState; import com.sk89q.worldedit.world.block.BlockStateHolder; +import org.apache.logging.log4j.Logger; import javax.annotation.Nullable; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; /** * An abstract {@link IChunk} class that implements basic get/set blocks. */ @SuppressWarnings("rawtypes") public class ChunkHolder> implements IQueueChunk { - - private static final Pool POOL = FaweCache.INSTANCE.registerPool( - ChunkHolder.class, - ChunkHolder::new, - Settings.settings().QUEUE.POOL - ); + private static final Logger LOGGER = LogManagerCompat.getLogger(); public static ChunkHolder newInstance() { - return POOL.poll(); + return new ChunkHolder(); } private volatile IChunkGet chunkExisting; // The existing chunk (e.g. a clipboard, or the world, before changes) @@ -63,16 +59,12 @@ public void init(IBlockDelegate delegate) { this.delegate = delegate; } + private static final AtomicBoolean recycleWarning = new AtomicBoolean(false); @Override - public synchronized void recycle() { - delegate = NULL; - if (chunkSet != null) { - chunkSet.recycle(); - chunkSet = null; + public void recycle() { + if (!recycleWarning.getAndSet(true)) { + LOGGER.warn("ChunkHolder should not be recycled.", new Exception()); } - chunkExisting = null; - extent = null; - POOL.offer(this); } public long initAge() { @@ -1018,7 +1010,6 @@ public synchronized T call() { // Do nothing }); } - recycle(); return null; } @@ -1031,6 +1022,7 @@ public synchronized T call(IChunkSet set, Runnable finalize) { IChunkGet get = getOrCreateGet(); try { get.lockCall(); + trackExtent(); boolean postProcess = !(getExtent().getPostProcessor() instanceof EmptyBatchProcessor); final int copyKey = get.setCreateCopy(postProcess); final IChunkSet iChunkSet = getExtent().processSet(this, get, set); @@ -1046,11 +1038,24 @@ public synchronized T call(IChunkSet set, Runnable finalize) { return get.call(set, finalizer); } finally { get.unlockCall(); + untrackExtent(); } } return null; } + // "call" can be called by QueueHandler#blockingExecutor. In such case, we still want the other thread + // to use this SingleThreadQueueExtent. Otherwise, many threads might end up locking on **one** STQE. + // This way, locking is spread across multiple STQEs, allowing for better performance + + private void trackExtent() { + ParallelQueueExtent.setCurrentExtent(extent); + } + + private void untrackExtent() { + ParallelQueueExtent.clearCurrentExtent(); + } + /** * Get the extent this chunk is in. */ diff --git a/worldedit-core/src/main/java/com/sk89q/worldedit/LocalSession.java b/worldedit-core/src/main/java/com/sk89q/worldedit/LocalSession.java index 67bbff7af9..a00b26702f 100644 --- a/worldedit-core/src/main/java/com/sk89q/worldedit/LocalSession.java +++ b/worldedit-core/src/main/java/com/sk89q/worldedit/LocalSession.java @@ -32,6 +32,7 @@ import com.fastasyncworldedit.core.limit.FaweLimit; import com.fastasyncworldedit.core.util.BrushCache; import com.fastasyncworldedit.core.util.MainUtil; +import com.fastasyncworldedit.core.util.MaskTraverser; import com.fastasyncworldedit.core.util.StringMan; import com.fastasyncworldedit.core.util.TaskManager; import com.fastasyncworldedit.core.util.TextureHolder; @@ -53,6 +54,7 @@ import com.sk89q.worldedit.entity.Player; import com.sk89q.worldedit.extension.platform.Actor; import com.sk89q.worldedit.extension.platform.Locatable; +import com.sk89q.worldedit.extent.NullExtent; import com.sk89q.worldedit.extent.clipboard.BlockArrayClipboard; import com.sk89q.worldedit.extent.clipboard.Clipboard; import com.sk89q.worldedit.extent.inventory.BlockBag; @@ -594,6 +596,9 @@ public void remember(EditSession editSession, boolean append, int limitMb) { long size = MainUtil.getSize(item); historySize -= size; } + // free the mask from any remaining references to e.g. extents + // if used again + new MaskTraverser(mask).reset(NullExtent.INSTANCE); } finally { historyWriteLock.unlock(); }