-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: rework chunk ordering using Reactor #4822
base: develop
Are you sure you want to change the base?
Conversation
oi, didn't expect this. I'll have to take a look tonight. |
@@ -44,7 +44,7 @@ | |||
|
|||
class LocalChunkProviderTest { | |||
|
|||
private static final int WAIT_CHUNK_IS_READY_IN_SECONDS = 5; | |||
private static final int WAIT_CHUNK_IS_READY_IN_SECONDS = 30; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While 5 is long enough for my computer, it appears that the CI machine does unfortunately need 30 seconds to generate all the chunks. It probably has a lot of threads running at once.
@@ -118,6 +117,12 @@ public void removeRelevanceEntity(EntityRef entity) { | |||
} | |||
} | |||
|
|||
public Stream<Vector3i> neededChunks() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should try to explain what we're doing here when writing new methods, or at least add a docstring to region
to make it simpler for others to figure out what is happening here.
relevanceSystem.neededChunks() | ||
.filter(pos -> !chunksInRange.contains(pos)) | ||
.forEach(chunksInRange::add); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure which one is "better", as this is already pretty clear in what it does. In general, I try to push the side effect to the outside, e.g., instead of adding to another list from a forEach
I'd do:
relevanceSystem.neededChunks() | |
.filter(pos -> !chunksInRange.contains(pos)) | |
.forEach(chunksInRange::add); | |
chunksInRange.addAll( | |
relevanceSystem.neededChunks() | |
.filter(pos -> !chunksInRange.contains(pos)) | |
.collect(Collectors.toList()); |
However, this has the additional collector in place we don't really need here, and the original solution is pretty clean already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a little bit clearer, but I think I would prefer keeping it as is to avoid allocating the extra list.
.filter(pos -> !chunksInRange.contains(pos)) | ||
.forEach(chunksInRange::add); | ||
chunksInRange.removeIf(x -> !relevanceSystem.isChunkInRegions(x) || isChunkReady(x)); | ||
chunksInRange.sort(relevanceSystem.createChunkPosComparator().reversed()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could there be duplicates ending up in chunksInRange
for some reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, the only place we add to chunksInRange
is the line above which filter
s out duplicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wait, duplicates actually are possible when there are multiple clients connected with overlapping relevance regions, since there will be duplicates in neededChunks
. However, that shouldn't actually cause any problems because the code to generate chunks checks currentlyProcessing
first.
private Chunk genChunk(Vector3ic pos) { | ||
ChunkStore chunkStore = storageManager.loadChunkStore(pos); | ||
Chunk chunk; | ||
EntityBufferImpl buffer = new EntityBufferImpl(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this inside the if
to only create a new instance if we really need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea, I just moved this code from somewhere else in the file but I'll change it.
scheduler = Schedulers.newParallel("chunk processing", NUM_TASK_THREADS); | ||
Flux<Chunk> stream = chunkStream.subscribeOn(scheduler); | ||
for (int i = 0; i < NUM_TASK_THREADS; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the parallel scheduler this is cpu bounded. we probably also want to limit the number of subscribers to the number of cores on the machine.
scheduler = Schedulers.newParallel("chunk processing", NUM_TASK_THREADS); | |
Flux<Chunk> stream = chunkStream.subscribeOn(scheduler); | |
for (int i = 0; i < NUM_TASK_THREADS; i++) { | |
Flux<Chunk> stream = chunkStream.subscribeOn(Schedulers.parallel()); | |
for (int i = 0; i < NUM_TASK_THREADS; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using the parallel scheduler, but with a name and limiting it to NUM_TASK_THREADS
threads. From the docs, it sounds like that's the only difference between newParallel
and parallel
. More than NUM_TASK_THREADS
workers wouldn't be helpful as there are only that many subscriptions, so are you just talking about the case where the host CPU has less than NUM_TASK_THREADS
cores?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just realized this is a consuming operation so maybe push the operation to boundedElastic(). should re-claim the thread when we're finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean when the player is standing still and chunk generation has finished, or after a world is closed? In the former case, I don't think that usually happens for very long, so I would guess that reclaiming the workers would have more overhead; in the latter case, the whole scheduler is already disposed after the world is closed.
for (ChunkProcessingInfo chunkProcessingInfo : chunkProcessingInfoMap.values()) { | ||
ChunkTask chunkTask = chunkProcessingInfo.getChunkTask(); | ||
if (chunkTask != null) { | ||
|
||
List<Chunk> providedChunks = new ArrayList<>(); | ||
boolean satisfied = true; | ||
for (Vector3ic pos : chunkTask.getRequirements()) { | ||
Chunk chunk = getChunkBy(chunkProcessingInfo.getChunkTaskProvider(), pos); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the reasoning behind this. seems so strange to me. so we have 2 threads pushing chunks to the same concurrent map and at the same time we also attempt to iterate over the collection. there is going to be a lot of thread contention with locking the individual tasks, and the map itself.
protected ChunkProcessingPipeline(Function<Vector3ic, Chunk> chunkProvider, Flux<Chunk> chunkStream, boolean threaded) { | ||
this.chunkProvider = chunkProvider; | ||
scheduler = Schedulers.newParallel("chunk processing", NUM_TASK_THREADS); | ||
Flux<Chunk> stream = chunkStream.subscribeOn(scheduler); | ||
for (int i = 0; i < NUM_TASK_THREADS; i++) { | ||
Flux<Chunk> stream = threaded ? chunkStream.subscribeOn(scheduler) : chunkStream; | ||
for (int i = 0; i < (threaded ? NUM_TASK_THREADS : 1); i++) { | ||
stream.subscribe(new BaseSubscriber<Chunk>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is kind of strange, seems like something just so it passes the unit test. I'm not sure this is a good direction that resolves the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just doesn't use separate threads in the pipeline tests. I agree that that's not ideal, but it's better than adding Thread.sleep(1000)
after each Flux.create()
, which is basically what we'd have to do otherwise to ensure that the subscription actually happens before we try to submit chunks (that was what was causing the failing test).
I guess another option would be adding another signal object for waiting until subscription happens, plus a flag to check if it's already subscribed - that would probably work, but I'm not sure it would actually be less intrusive. If you think that would be better, though, I can do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just make Schduler a parameter then you can use the FIFO scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, I've just implemented that. Unfortunately there isn't an existing scheduler that runs everything immediately on the same thread, but a custom one that does that was just a 10 line anonymous class.
* @param threaded Whether to use worker threads to generate chunks. If it's false, all processing will be done on the main thread. | ||
* It should be set to false in tests that request processing of specific chunks. | ||
* Always true in a real game. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that doing away with the complexity of multi-threading during tests hopefully helps to fight flakiness and instability.
However, this also means that we're not testing the "real thing" here. The good part still is that if something goes wrong in the end at runtime we know to start looking at the concurrency parts...
@keturn might this pitfall be also occurring in Pathfinding, even without using Reactor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do agree, it's unfortunate that the concurrent version doesn't work reliably on CI. I'd kind of still like to be able to run it concurrently on my computer, since it does work reliably on a normal computer, but I don't know if there's a nice way to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of the complexity of this code is there explicitly for the reason of allowing chunk generation to happen in parallel.
If there were only a single-threaded sink processing things from a queue, there's a lot of synchronization and currentyProcessing
sorts of variables we wouldn't need.
If we are never testing the multi-threaded scenarios, we're seriously undermining the value of our tests.
private Flux<Chunk> chunkFlux() { | ||
Set<Vector3ic> currentlyProcessing = new HashSet<>(); | ||
return Flux.create(sink -> sink.onRequest(numChunks -> { | ||
// Figuring out the positions to generate needs to be synchronized | ||
List<Vector3ic> positionsPending = new ArrayList<>((int) numChunks); | ||
synchronized (this) { | ||
if (checkForUpdate()) { | ||
updateList(); | ||
} | ||
|
||
while (positionsPending.size() < numChunks && !chunksInRange.isEmpty()) { | ||
Vector3ic pos = chunksInRange.remove(chunksInRange.size() - 1); | ||
if (currentlyProcessing.contains(pos) || loadingPipeline.isPositionProcessing(pos)) { | ||
continue; | ||
} | ||
|
||
positionsPending.add(pos); | ||
currentlyProcessing.add(pos); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important thing to know (for newbies like me to the Flux API): This emitter function we're passing to Flux.create (on line 460) is called with a new sink for each subscriber.
That's how we're managing to add multiple chunk-processing subscribers to this without having each subscriber process the same chunks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like fewer levels of nested scope here.
There's some state in fields of this LocalChunkProvider instance, some state captured in the scope of each invocation of chunkFlux
(currentlyProcessing
), we're synchronizing on something in line 463 which is probably LocalChunkProvider but might be this anonymous Consumer object we're passing to sink.onRequest
.
I think it'd be worth using more named methods (and maybe even a new class) to make it clearer what state lives where and what its lifetime is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, named methods would be good and I think currentlyProcessing
should just be a member variable. I'll change that, and add some comments explaining when things get called too, since I agree that it's not very intuitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've got the gist of it now, though I haven't looked at all the details.
I fear we're torturing the Flux model with this.
I think that conceptually, we have some sort of publisher of relevant-locations-of-chunks at the top, and something that puts chunk data in to a chunk cache at the bottom, and we want some kind of
f(Flux<Vector3ic> locations) {
Flux<Chunk> fullChunks = locations
.parallel()
.map(chunkProvider);
fullChunks.subscribe(cacheChunk);
}
with maybe some kind of buffering or windowing operator in there if you think we need to tune CHUNKS_AT_ONCE
.
I think that the way this implementation of LocalChunkProvider.chunkFlux
returns a Flux<Chunk>
that looks different to every subscriber is technically fitting within the bounds of the types, but is contrary to the spirit of a Flux stream.
and I still think that ChunkProcessingPipeline creating a new Scheduler with its own thread pool is a thing we should do only as a last resort. I could be convinced that since chunk generation is such an important and ever-present task, it's worth making an exception for if necessary, but in general that's not a pattern we want systems to use.
private Flux<Chunk> chunkFlux() { | ||
Set<Vector3ic> currentlyProcessing = new HashSet<>(); | ||
return Flux.create(sink -> sink.onRequest(numChunks -> { | ||
// Figuring out the positions to generate needs to be synchronized | ||
List<Vector3ic> positionsPending = new ArrayList<>((int) numChunks); | ||
synchronized (this) { | ||
if (checkForUpdate()) { | ||
updateList(); | ||
} | ||
|
||
while (positionsPending.size() < numChunks && !chunksInRange.isEmpty()) { | ||
Vector3ic pos = chunksInRange.remove(chunksInRange.size() - 1); | ||
if (currentlyProcessing.contains(pos) || loadingPipeline.isPositionProcessing(pos)) { | ||
continue; | ||
} | ||
|
||
positionsPending.add(pos); | ||
currentlyProcessing.add(pos); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like fewer levels of nested scope here.
There's some state in fields of this LocalChunkProvider instance, some state captured in the scope of each invocation of chunkFlux
(currentlyProcessing
), we're synchronizing on something in line 463 which is probably LocalChunkProvider but might be this anonymous Consumer object we're passing to sink.onRequest
.
I think it'd be worth using more named methods (and maybe even a new class) to make it clearer what state lives where and what its lifetime is.
That would definitely be nicer, and it's similar to what I tried to do first, but unfortunately it didn't seem to work with our execution model. One problem is that a
I'm also wondering about this - there's no mention in the Reactor docs that a flux should always emit the same values to each subscriber, but I also can't find any examples that do that. |
This is the part I keep getting stuck on when trying to think about this design. It might help if I knew more about what the wake-up triggers would be. Every player movement? Or is there some set of relevance regions that we'd observe changes to? |
It's woken up whenever the relevance region is updated, by calling |
I've made two more videos with the same world, trying to compare current |
@skaldarnar anything else blocking this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realize we lose some testability with this change since the scheduler is limited to a single thread in the test, but wouldn't mind proceeding with the change regardless. would opt to make a ticket explaining what would need to be changed to address the problems with the testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reactor contains Scheduler.immediate scheduler already
Good idea, thank you! I thought I had tried this but apparently I hadn't, because it seems to work. |
Contains
This is a rewrite of #4773 to use Reactor for concurrency (see #4798 and #4786). It does the same thing as the old PR, but is a lot simpler and has more reliable concurrency. Performace should be about the same.
How to test
Try flying around the world with this PR - it should be the same as #4773 and much better than normal.