Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rework chunk ordering using Reactor #4822

Open
wants to merge 23 commits into
base: develop
Choose a base branch
from

Conversation

naalit
Copy link
Contributor

@naalit naalit commented Jul 14, 2021

Contains

This is a rewrite of #4773 to use Reactor for concurrency (see #4798 and #4786). It does the same thing as the old PR, but is a lot simpler and has more reliable concurrency. Performace should be about the same.

How to test

Try flying around the world with this PR - it should be the same as #4773 and much better than normal.

@github-actions github-actions bot added the Type: Improvement Request for or addition/enhancement of a feature label Jul 14, 2021
@pollend
Copy link
Member

pollend commented Jul 14, 2021

oi, didn't expect this. I'll have to take a look tonight.

@@ -44,7 +44,7 @@

class LocalChunkProviderTest {

private static final int WAIT_CHUNK_IS_READY_IN_SECONDS = 5;
private static final int WAIT_CHUNK_IS_READY_IN_SECONDS = 30;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While 5 is long enough for my computer, it appears that the CI machine does unfortunately need 30 seconds to generate all the chunks. It probably has a lot of threads running at once.

@@ -118,6 +117,12 @@ public void removeRelevanceEntity(EntityRef entity) {
}
}

public Stream<Vector3i> neededChunks() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should try to explain what we're doing here when writing new methods, or at least add a docstring to region to make it simpler for others to figure out what is happening here.

Comment on lines +413 to +415
relevanceSystem.neededChunks()
.filter(pos -> !chunksInRange.contains(pos))
.forEach(chunksInRange::add);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure which one is "better", as this is already pretty clear in what it does. In general, I try to push the side effect to the outside, e.g., instead of adding to another list from a forEach I'd do:

Suggested change
relevanceSystem.neededChunks()
.filter(pos -> !chunksInRange.contains(pos))
.forEach(chunksInRange::add);
chunksInRange.addAll(
relevanceSystem.neededChunks()
.filter(pos -> !chunksInRange.contains(pos))
.collect(Collectors.toList());

However, this has the additional collector in place we don't really need here, and the original solution is pretty clean already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a little bit clearer, but I think I would prefer keeping it as is to avoid allocating the extra list.

.filter(pos -> !chunksInRange.contains(pos))
.forEach(chunksInRange::add);
chunksInRange.removeIf(x -> !relevanceSystem.isChunkInRegions(x) || isChunkReady(x));
chunksInRange.sort(relevanceSystem.createChunkPosComparator().reversed());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could there be duplicates ending up in chunksInRange for some reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, the only place we add to chunksInRange is the line above which filters out duplicates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, duplicates actually are possible when there are multiple clients connected with overlapping relevance regions, since there will be duplicates in neededChunks. However, that shouldn't actually cause any problems because the code to generate chunks checks currentlyProcessing first.

private Chunk genChunk(Vector3ic pos) {
ChunkStore chunkStore = storageManager.loadChunkStore(pos);
Chunk chunk;
EntityBufferImpl buffer = new EntityBufferImpl();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this inside the if to only create a new instance if we really need it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, I just moved this code from somewhere else in the file but I'll change it.

Comment on lines 54 to 56
scheduler = Schedulers.newParallel("chunk processing", NUM_TASK_THREADS);
Flux<Chunk> stream = chunkStream.subscribeOn(scheduler);
for (int i = 0; i < NUM_TASK_THREADS; i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the parallel scheduler this is cpu bounded. we probably also want to limit the number of subscribers to the number of cores on the machine.

Suggested change
scheduler = Schedulers.newParallel("chunk processing", NUM_TASK_THREADS);
Flux<Chunk> stream = chunkStream.subscribeOn(scheduler);
for (int i = 0; i < NUM_TASK_THREADS; i++) {
Flux<Chunk> stream = chunkStream.subscribeOn(Schedulers.parallel());
for (int i = 0; i < NUM_TASK_THREADS; i++) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using the parallel scheduler, but with a name and limiting it to NUM_TASK_THREADS threads. From the docs, it sounds like that's the only difference between newParallel and parallel. More than NUM_TASK_THREADS workers wouldn't be helpful as there are only that many subscriptions, so are you just talking about the case where the host CPU has less than NUM_TASK_THREADS cores?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just realized this is a consuming operation so maybe push the operation to boundedElastic(). should re-claim the thread when we're finished.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean when the player is standing still and chunk generation has finished, or after a world is closed? In the former case, I don't think that usually happens for very long, so I would guess that reclaiming the workers would have more overhead; in the latter case, the whole scheduler is already disposed after the world is closed.

Comment on lines 128 to 135
for (ChunkProcessingInfo chunkProcessingInfo : chunkProcessingInfoMap.values()) {
ChunkTask chunkTask = chunkProcessingInfo.getChunkTask();
if (chunkTask != null) {

List<Chunk> providedChunks = new ArrayList<>();
boolean satisfied = true;
for (Vector3ic pos : chunkTask.getRequirements()) {
Chunk chunk = getChunkBy(chunkProcessingInfo.getChunkTaskProvider(), pos);
Copy link
Member

@pollend pollend Jul 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reasoning behind this. seems so strange to me. so we have 2 threads pushing chunks to the same concurrent map and at the same time we also attempt to iterate over the collection. there is going to be a lot of thread contention with locking the individual tasks, and the map itself.

Comment on lines 64 to 69
protected ChunkProcessingPipeline(Function<Vector3ic, Chunk> chunkProvider, Flux<Chunk> chunkStream, boolean threaded) {
this.chunkProvider = chunkProvider;
scheduler = Schedulers.newParallel("chunk processing", NUM_TASK_THREADS);
Flux<Chunk> stream = chunkStream.subscribeOn(scheduler);
for (int i = 0; i < NUM_TASK_THREADS; i++) {
Flux<Chunk> stream = threaded ? chunkStream.subscribeOn(scheduler) : chunkStream;
for (int i = 0; i < (threaded ? NUM_TASK_THREADS : 1); i++) {
stream.subscribe(new BaseSubscriber<Chunk>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is kind of strange, seems like something just so it passes the unit test. I'm not sure this is a good direction that resolves the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just doesn't use separate threads in the pipeline tests. I agree that that's not ideal, but it's better than adding Thread.sleep(1000) after each Flux.create(), which is basically what we'd have to do otherwise to ensure that the subscription actually happens before we try to submit chunks (that was what was causing the failing test).

I guess another option would be adding another signal object for waiting until subscription happens, plus a flag to check if it's already subscribed - that would probably work, but I'm not sure it would actually be less intrusive. If you think that would be better, though, I can do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just make Schduler a parameter then you can use the FIFO scheduler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I've just implemented that. Unfortunately there isn't an existing scheduler that runs everything immediately on the same thread, but a custom one that does that was just a 10 line anonymous class.

Comment on lines 60 to 62
* @param threaded Whether to use worker threads to generate chunks. If it's false, all processing will be done on the main thread.
* It should be set to false in tests that request processing of specific chunks.
* Always true in a real game.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that doing away with the complexity of multi-threading during tests hopefully helps to fight flakiness and instability.

However, this also means that we're not testing the "real thing" here. The good part still is that if something goes wrong in the end at runtime we know to start looking at the concurrency parts...

@keturn might this pitfall be also occurring in Pathfinding, even without using Reactor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree, it's unfortunate that the concurrent version doesn't work reliably on CI. I'd kind of still like to be able to run it concurrently on my computer, since it does work reliably on a normal computer, but I don't know if there's a nice way to do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of the complexity of this code is there explicitly for the reason of allowing chunk generation to happen in parallel.

If there were only a single-threaded sink processing things from a queue, there's a lot of synchronization and currentyProcessing sorts of variables we wouldn't need.

If we are never testing the multi-threaded scenarios, we're seriously undermining the value of our tests.

@keturn keturn added the Topic: Concurrency Requests, issues, and changes relating to threading, concurrency, parallel execution, etc. label Jul 26, 2021
Comment on lines 458 to 475
private Flux<Chunk> chunkFlux() {
Set<Vector3ic> currentlyProcessing = new HashSet<>();
return Flux.create(sink -> sink.onRequest(numChunks -> {
// Figuring out the positions to generate needs to be synchronized
List<Vector3ic> positionsPending = new ArrayList<>((int) numChunks);
synchronized (this) {
if (checkForUpdate()) {
updateList();
}

while (positionsPending.size() < numChunks && !chunksInRange.isEmpty()) {
Vector3ic pos = chunksInRange.remove(chunksInRange.size() - 1);
if (currentlyProcessing.contains(pos) || loadingPipeline.isPositionProcessing(pos)) {
continue;
}

positionsPending.add(pos);
currentlyProcessing.add(pos);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important thing to know (for newbies like me to the Flux API): This emitter function we're passing to Flux.create (on line 460) is called with a new sink for each subscriber.

That's how we're managing to add multiple chunk-processing subscribers to this without having each subscriber process the same chunks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like fewer levels of nested scope here.

There's some state in fields of this LocalChunkProvider instance, some state captured in the scope of each invocation of chunkFlux (currentlyProcessing), we're synchronizing on something in line 463 which is probably LocalChunkProvider but might be this anonymous Consumer object we're passing to sink.onRequest.

I think it'd be worth using more named methods (and maybe even a new class) to make it clearer what state lives where and what its lifetime is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, named methods would be good and I think currentlyProcessing should just be a member variable. I'll change that, and add some comments explaining when things get called too, since I agree that it's not very intuitive.

Copy link
Member

@keturn keturn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've got the gist of it now, though I haven't looked at all the details.

I fear we're torturing the Flux model with this.

I think that conceptually, we have some sort of publisher of relevant-locations-of-chunks at the top, and something that puts chunk data in to a chunk cache at the bottom, and we want some kind of

f(Flux<Vector3ic> locations) {
   Flux<Chunk> fullChunks = locations
      .parallel()
      .map(chunkProvider);
   fullChunks.subscribe(cacheChunk);
}

with maybe some kind of buffering or windowing operator in there if you think we need to tune CHUNKS_AT_ONCE.

I think that the way this implementation of LocalChunkProvider.chunkFlux returns a Flux<Chunk> that looks different to every subscriber is technically fitting within the bounds of the types, but is contrary to the spirit of a Flux stream.

and I still think that ChunkProcessingPipeline creating a new Scheduler with its own thread pool is a thing we should do only as a last resort. I could be convinced that since chunk generation is such an important and ever-present task, it's worth making an exception for if necessary, but in general that's not a pattern we want systems to use.

Comment on lines 458 to 475
private Flux<Chunk> chunkFlux() {
Set<Vector3ic> currentlyProcessing = new HashSet<>();
return Flux.create(sink -> sink.onRequest(numChunks -> {
// Figuring out the positions to generate needs to be synchronized
List<Vector3ic> positionsPending = new ArrayList<>((int) numChunks);
synchronized (this) {
if (checkForUpdate()) {
updateList();
}

while (positionsPending.size() < numChunks && !chunksInRange.isEmpty()) {
Vector3ic pos = chunksInRange.remove(chunksInRange.size() - 1);
if (currentlyProcessing.contains(pos) || loadingPipeline.isPositionProcessing(pos)) {
continue;
}

positionsPending.add(pos);
currentlyProcessing.add(pos);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like fewer levels of nested scope here.

There's some state in fields of this LocalChunkProvider instance, some state captured in the scope of each invocation of chunkFlux (currentlyProcessing), we're synchronizing on something in line 463 which is probably LocalChunkProvider but might be this anonymous Consumer object we're passing to sink.onRequest.

I think it'd be worth using more named methods (and maybe even a new class) to make it clearer what state lives where and what its lifetime is.

@naalit
Copy link
Contributor Author

naalit commented Jul 27, 2021

I think that conceptually, we have some sort of publisher of relevant-locations-of-chunks at the top, and something that puts chunk data in to a chunk cache at the bottom, and we want some kind of

f(Flux<Vector3ic> locations) {
  Flux<Chunk> fullChunks = locations
     .parallel()
     .map(chunkProvider);
  fullChunks.subscribe(cacheChunk);
}

with maybe some kind of buffering or windowing operator in there if you think we need to tune CHUNKS_AT_ONCE.

That would definitely be nicer, and it's similar to what I tried to do first, but unfortunately it didn't seem to work with our execution model. One problem is that a Flux<Vector3ic> and then a Function<Vector3ic, Chunk> doesn't work well with the RemoteChunkProvider, which would have to maintain another hash map of received chunks. Another is that when there are no more chunks to generate right now and execution pauses, there's no way to wake it up again from the main thread - a possible solution to this is a signal object that the location flux waits on, but that's not very nice either.

I think that the way this implementation of LocalChunkProvider.chunkFlux returns a Flux that looks different to every subscriber is technically fitting within the bounds of the types, but is contrary to the spirit of a Flux stream.

I'm also wondering about this - there's no mention in the Reactor docs that a flux should always emit the same values to each subscriber, but I also can't find any examples that do that.

@keturn
Copy link
Member

keturn commented Jul 28, 2021

Another is that when there are no more chunks to generate right now and execution pauses, there's no way to wake it up again from the main thread

This is the part I keep getting stuck on when trying to think about this design.

It might help if I knew more about what the wake-up triggers would be. Every player movement? Or is there some set of relevance regions that we'd observe changes to?

@naalit
Copy link
Contributor Author

naalit commented Jul 28, 2021

It might help if I knew more about what the wake-up triggers would be. Every player movement? Or is there some set of relevance regions that we'd observe changes to?

It's woken up whenever the relevance region is updated, by calling LocalChunkProvider.notifyRelevanceChanged(). In practice that's every time the player moves.

@pollend pollend requested a review from keturn August 4, 2021 02:21
@skaldarnar
Copy link
Member

I've made two more videos with the same world, trying to compare current develop against this PR:

https://youtu.be/TloLsta3dcM

https://youtu.be/dXdL9KDQKSg

@pollend
Copy link
Member

pollend commented Aug 7, 2021

@skaldarnar anything else blocking this?

pollend
pollend previously approved these changes Aug 7, 2021
Copy link
Member

@pollend pollend left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize we lose some testability with this change since the scheduler is limited to a single thread in the test, but wouldn't mind proceeding with the change regardless. would opt to make a ticket explaining what would need to be changed to address the problems with the testing.

Copy link
Contributor

@DarkWeird DarkWeird left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reactor contains Scheduler.immediate scheduler already

@naalit
Copy link
Contributor Author

naalit commented Aug 19, 2021

Reactor contains Scheduler.immediate scheduler already

Good idea, thank you! I thought I had tried this but apparently I hadn't, because it seems to work.

@jdrueckert jdrueckert added this to the 5.4.0 milestone Sep 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Topic: Concurrency Requests, issues, and changes relating to threading, concurrency, parallel execution, etc. Type: Improvement Request for or addition/enhancement of a feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants