Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batching Oplog Entries & DDP messages #10478

Closed
wants to merge 17 commits into from

Conversation

KoenLav
Copy link
Contributor

@KoenLav KoenLav commented Mar 4, 2019

This PR implements a couple of things:

  • DDP now supports sending an Array of messages, rather than a singular message;
  • In the Session on the Server, the messages for every individual client are buffered (if additional messages are received within 10ms, the buffer fills up until at most 1000 messages have accrued or 500ms has passed);
  • In the Crossbar on the server OplogEntries are buffered for every (potentially re-used) ObserveHandle (if additional entries are received for the same ObserveHandle within 5ms, the buffer fills up until at most 1000 entries or 500ms have passed).

This decreases the load on server setup for two reasons:

  • when messages are buffered inside the Session on the Server there is less overhead while stringifying individual messages and sending individual messages over the WebSocket;
  • when messages are buffered when they are received in the Crossbar we give the server a chance to recognize the fact it is falling behind too far (and possibly fall back to polling).

To see the effect of this PR I would recommend cloning this repository (https://github.com/koenlav/meteor-fiber-repro) and comparing the performance with the development branch.

=====

This PR supersedes both #9862 and #9885, but does not (yet) achieve the desired end-result from #9876.

Also it does not incorporate the changes from #9622, as this PR appears to be stale.

While this PR makes sure we buffer at the start and end of the process from MongoDB => socket, it is likely to be possible to refactor all classes involved in between to accept 'messages' (rather than added, changed, removed), which would allow us to make use of the full potential of these buffers; we would only need to unwind the array of messages in one or two places on the server.

@KoenLav
Copy link
Contributor Author

KoenLav commented Mar 4, 2019

Tests are likely to fail because not all parts of the code that make use of DDP expect/understand message Arrays instead of singular messages; I will definitely need some help pinpointing these issues.

messages = Array.isArray(messages) ? messages : [messages];

for (var i = 0; i < messages.length; i++) {
var msg = messages[i];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this line nothing aside from indenting was changed actually, GitHub just does a bad job at highlighting that.

Copy link

@smeijer smeijer Mar 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offtopic advise, append ?w=1 to the review url to get rid of whitespace.

https://github.com/meteor/meteor/pull/10478/files/2870711fbb508d8375f413b3e0bf31c81ed29d8f?w=1

Or, install refined github and have a No Whitespace button:

image

@@ -38,80 +38,102 @@ export function last(array, n, guard) {
return slice.call(array, Math.max(array.length - n, 0));
}

DDPCommon.SUPPORTED_DDP_VERSIONS = [ '1', 'pre2', 'pre1' ];
DDPCommon.SUPPORTED_DDP_VERSIONS = [ '2', '1', 'pre2', 'pre1' ];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess a new version of DDP would be necessary.

Things should be backwards compatible!

messages = Array.isArray(messages) ? messages : [messages];

for (let i = 0; i < messages.length; i++) {
var msg = messages[i];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, nothing after this line changed.


for (let i = 0; i < messages.length; i++) {
const msg = messages[i];
const copy = EJSON.clone(msg);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, no changes after this line.


if (! _.has(self.listenersByCollection, collection)) {
return;
if (listenersForCollection) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether we even still need the listeners (we could also refactor any other usages of listener to make use of buffer instead, I think).

@@ -265,6 +265,26 @@ var Session = function (server, version, socket, options) {
// List of callbacks to call when this connection is closed.
self._closeCallbacks = [];

// DDP clients with version 2 and up should support batching of DDP messages
var allowBuffering = version >= 2;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffering is enabled by default.

self._bufferedMessagesMaxAge = options.bufferedMessagesMaxAge || 500;

// Maximum amount of messages to store in the buffer before flushing
self._bufferedMessagesMaxAmount = options.bufferedMessagesMaxAmount || 1000;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These values (10, 500, 1000) are somewhat arbitrary, I guess mileage may vary.

// Maximum age of the buffer
self.bufferMaxAge = 500;
// Maximum amount of notifications to store in the buffer before flushing
self.bufferMaxSize = 1000;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These values (5, 500, 1000) are somewhat arbitrary, I guess mileage may vary.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth making these configurable.

@KoenLav
Copy link
Contributor Author

KoenLav commented Sep 6, 2019

@benjamn any updates on this?

@mitar
Copy link
Contributor

mitar commented Sep 19, 2019

I would like to re-iterate my comment here:

I think this is a substantial change, and also requires DDP protocol changes, which I would not take so lightly at this point in Meteor development life (all other DDP clients would then have to be updated).

Moreover, it might be an unnecessary change if underlying fiber pool issue is fixed. I think it might be better approach to get node used by Meteor to have a fix addressing the fiber pool issue. In that case both this issue and any other code path which makes a spike of fibers would perform better. This is why we need a test to measure performance improvement. So that we could compare existing code vs. existing code with fiber patch vs. existing code with batching of messages vs. existing code with fiber patch with batching of messages. Then we can know which approach to take.

Edit: Everyone reading this PR, please see the whole discussion in the PR I linked above which start with my comment there.

@KoenLav
Copy link
Contributor Author

KoenLav commented Sep 20, 2019

@mitar in reply to that I'd like to reiterate my comment here.

Some other reiterations and observations:

  1. Anyone can test and see the benefits of this PR easily (long before any Fibers issues are triggered):
    "To see the effect of this PR I would recommend cloning this repository (koenlav/meteor-fiber-repro) and comparing the performance with the development branch."

  2. You keep asking for tests, which I have provided (and anyone can easily reproduce themselves), but seem unwilling to test things yourself?

"This is why we need a test to measure performance improvement." => DIY?

  1. The update to the DDP spec is implemented in a backwards compatible manner (clients which do not accept Array form messages won't receive them):
    "I guess a new version of DDP would be necessary. Things should be backwards compatible though!"

"All other DDP clients would then have to be updated" => not true.

  1. The issue of 'creating' a lot of Fibers should be resolved regardless, as confirmed by @kentonv (it's not a good idea to create a huge amount of Fibers due to the allocation of a stack for each Fiber). Implementing a pool for the Fibers alleviates the issue when it does occur (so it is a good idea regardless), but that doesn't mean we shouldn't try to keep it from happening in the first place.

"Moreover, it might be an unnecessary change if underlying fiber pool issue is fixed." => Not true.

Summarizing I think our goals should be:

  • we don't want Fiber spikes;
  • if they do occur, we want them to have a minimal impact.

Can we at least agree on that?

ps apologies if my reply seems rather direct, but I feel like you're only trying to make a very specific point without (objectively) looking at the pros and cons.

@mitar
Copy link
Contributor

mitar commented Sep 20, 2019

Anyone can test and see the benefits of this PR easily (long before any Fibers issues are triggered):

Of course, but I am saying that fixing Fiber issues would provide similar benefits. And is a more general solution.

The update to the DDP spec is implemented in a backwards compatible manner (clients which do not accept Array form messages won't receive them):

I see, you check for the version. Nice.

apologies if my reply seems rather direct, but I feel like you're only trying to make a very specific point without (objectively) looking at the pros and cons.

No no. Direct replies are good. :-)

we don't want Fiber spikes;
if they do occur, we want them to have a minimal impact.

See, we do not agree here. We do not want Fiber spikes. So let's fix that. No need for the second part.

@KoenLav
Copy link
Contributor Author

KoenLav commented Sep 20, 2019

Thanks for your constructive reply! :)

The issue in Node.js (V8 actually) which limits the impact of a spike in Fibers has been resolved and merged into Node.js 8.12: #10090 (comment)

The proposed patch by @nathan-muir moves the Fibers into a pool which promotes re-use of earlier created stacks rather than creating a new stack per Fiber (as far as I understand).

The above are ways to mitigate the huge performance penalties associated with Fiber spikes. This PR, on the other sides, attempts to create a way for Meteor to recognize when it will create a Fiber spike and bail out (by fetching once, instead of 1000's of times).

Meteor has always implemented logic to fall back to fetching for a specific collection if there was too much to fetch, but as far as I can tell this condition will never be triggered (because at some point Meteor will consume 100% CPU, of which about 50% for incoming Oplog notifications and 50% for working through them, so the 'counter' which keeps track of how far we are behind stays at roughly the same number, never triggering a fetch).

This PR accomplishes two things:

  • improve performance for high-throughput Collections in Meteor by batching work together (as seen in the tests);
  • by batching we allow Meteor to recognize it will cause a Fiber spike and bail out to fetching instead of processing the oplog.

What you seem to be saying is that you don't want the effects of a Fiber spike, but you don't want to resolve the cause of the Fiber spike?

The alternative approaches you are suggesting alleviate the effects of a Fiber spike, but don't do anything to keep it from happening.

if (self._needToFetch.size() >= 1000) {
self._needToPollQuery();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mitar

Here we decide to bail out on fetching 1000 documents via individual calls to MongoDB for each document and instead fetch all the needed documents in one operation.

On line 515 below you can see that the _docFetcher will be called for every document which 'currently needs to be fetched', which is the cause of the Fibers spike.

ps I'm not sure that this is already the perfect solution, but I am 100% sure that we have isolated the cause and should be working on ways to resolve it.

pps an alternative approach would be:

  • batch Oplog messages (otherwise Meteor can never find out when to fetch multiple documents in one operation);
  • pass an Array of _ids to the _docFetcher (basically rewriting the _docFetcher to fetch multiple docs).

But we will need to discuss this with people with more intricate knowledge of how this has been setup.

@retani
Copy link

retani commented Sep 20, 2019

In the Session on the Server, the messages for every individual client are buffered (if additional messages are received within 10ms, the buffer fills up until at most 1000 messages have accrued or 500ms has passed);

Is there an easy way to opt out of this or configure the numbers? I have several real time apps that rely and have always relied on instant sending of DDP messages (and do not cater many simultaneous clients)

@mitar
Copy link
Contributor

mitar commented Sep 20, 2019

@retani Do you know that there is already batching happening on the client side? You can disable that, but just checking if you are doing that.

@retani
Copy link

retani commented Sep 20, 2019

@retani Do you know that there is already batching happening on the client side? You can disable that, but just checking if you are doing that.

Good point. I know but I don't know how to disable it. How to do it? Is it documented somewhere?

@mitar
Copy link
Contributor

mitar commented Sep 20, 2019

Good point. I know but I don't know how to disable it. How to do it? Is it documented somewhere?

Not sure if it is documented, it should be. It is one or two attributes on the Meteor.connection instance.

@KoenLav
Copy link
Contributor Author

KoenLav commented Sep 22, 2019

@retani I've implemented the "buffered/batched listening" alongside the pre-existing "immediate listening" and have used a flag throughout the code to enable it, so it would be trivial to allow this behavior to be configurable on a per subscription basis.

@retani
Copy link

retani commented Sep 26, 2019

@KoenLav that's great! So how would I disable it?
I think the problem might be documentation here. There should be some more information here: https://docs.meteor.com/api/pubsub.html#Meteor-subscribe

@CLAassistant
Copy link

CLAassistant commented Oct 7, 2019

CLA assistant check
All committers have signed the CLA.

@sebakerckhof
Copy link
Contributor

sebakerckhof commented Aug 28, 2020

So I think there are a couple of reasons this isn't progressing:

  1. Honestly, I don't think any of us feels comfortable enough with these parts of the code to guarantee it won't break stuff
    Ok, there is the DDP version which should ensure that old DDP clients don't break. But it has the potential to break external packages like meteor-streams / meteor-direct-stream-access which are not always maintained anymore but still quite popular.
    It may also change semantics in certain subtle ways that are not immediately obvious.

  2. It's hard to validate the performance claims in real world applications. Sure, there is a test repo, but that's a micro benchmark which may not translate to real world usage with lots of subscribers on normal use cases. Furthermore, it only looks at CPU, but now you may get memory spikes from the buffers, which, with a lot of subscribers may lead to memory issues resulting in the opposite effect on CPU.

  3. This PR does 2 unrelated things: buffering of the oplog messages and ddp messages. I think it would be better if it would be split out.
    IIRC, this whole thing started because of fiber spikes. So you started with buffering DDP messages until it was discovered that this is not what causes the fiber spikes, but rather the oplog tailing.
    So all that is left as argument for the DDP buffers is that it is more performant to serialize one big object instead of multiple small ones, but from what I see this isn't backed by the evidence ( e.g. Batch messages sent over DDP from server to client (DDP V2?) #9862 (comment) ). So are there any gains and is it worth the risk of introducing a new DDP version, possibly breaking other packages, having memory spikes etc?

The crossbar stuff seems more easily acceptable to me and there the number of subscribers doesn't really matter so easier to test / reason about.

@KoenLav
Copy link
Contributor Author

KoenLav commented Aug 28, 2020

I think the main reason this PR hasn't progressed is because nobody from the Meteor team has taken the time to take a proper look at it, before you just did.

Thanks for taking the time to do so!

Now that is out of the way; I think your criticism makes a lot of sense. Splitting the PR in smaller parts which are easier to review.

I don't agree that batching in Oplog and DDP are unrelated (when batching Oplog entries it makes sense to batch them all the way to the client, rather than unwinding them before sending them over the wire), but I do see how splitting these things would make these changes easier to discuss.

If there's interest in improving the performance of the OplogObserver/Crossbar (which helps Meteor in spotting when a Fiber spike may occur due to a high number of Oplog entries) then I'd be happy to work on these changes.

If not, that's also fine, but then it's best to just close this PR and leave things as is.

@sebakerckhof
Copy link
Contributor

sebakerckhof commented Aug 28, 2020

I don't agree that batching in Oplog and DDP are unrelated (when batching Oplog entries it makes sense to batch them all the way to the client, rather than unwinding them before sending them over the wire),

You would be right if there is only 1 publication for each collection, but the thing is that it's not the oplog entries which are being send to the client. It's the updates from the session collection view (aka mergebox). So they have to be unwinded and processed one-by-one to be able to send the deltas for each user specifically. That's why I say they're unrelated.

In fact this is already a form of batching (over multiple publications).

So let's say you have 2 publications on the same collection, this is the flow of the messages:

pub 1 oplog tail entry -> multiplexer -> observe handle -> session collection view 
                                                                                    \
                                                                                     => ddp message
                                                                                    /
pub 2 oplog tail entry -> multiplexer -> observe handle -> session collection view 

If a document gets removed it will be removed from both publications, but the session collection view will make sure you only get 1 removed message.
If a document gets removed from just a single publication (because it does not match the query anymore), but it still matches the other one, the session collection view will make sure you don't get any update message.

And this is of course unique for each user, so you can not batch from oplog to ddp.

@KoenLav
Copy link
Contributor Author

KoenLav commented Aug 30, 2020

Good point!

So what you're saying is the Oplog messages need to be filtered at some point, which definitely requires looping over them, but this doesn't require sending the DDP messages one-by-one.

Oplog changes and DDP messages are definitely not the same, we agree on that :)

Still happy to help with improving the performance of Meteor.

@sebakerckhof
Copy link
Contributor

sebakerckhof commented Aug 31, 2020

yes, so just to give a more concrete example.

Let's say there a user is subscribed to 2 publications, returning these cursors:

Notifications.find({}, { fields: { a: 1, b: 1, c: 1 } }); // pub 1
Notifications.find({ b: 1 }, { fields: { b: 1, c: 1, d: 1 } }); // pub 2

Let's say the following actions happen on the notification collection:

Notifications.insert({ _id: 'abc', a: 0, b: 0, c: 0, d: 0 });

This will result in the following cursor events (from the publications):

pub 1: added { _id: 'abc', a: 0, b: 0, c: 0  }

And the following ddp messages

collection notifications: added { _id: 'abc', a: 0, b: 0, c: 0  }
Notifications.update('abc', { $set: { b: '1' } });

This will result in the following cursor events (from the publications):

pub 1: changed - fields: { b: 1  }
pub 2: added { _id: 'abc', b: 1, c: 0, d: 0  }

And the following ddp messages

collection notifications: changed  - fields: { b: 1 }
collection notifications: changed  - fields: { d: 0 }
Notifications.update('abc', { $set: { a: '1', c: '1' } });

This will result in the following cursor events (from the publications):

pub 1: changed - fields: { a: 1, c: 1  }
pub 2: changed - fields { c: 1 }

And the following ddp messages

collection notifications: changed  - fields: { a: 1, c: 1 }
Notifications.update('abc', { $set: { b: '2' } });

This will result in the following cursor events (from the publications):

pub 1: changed - fields: { b: '2'  }
pub 2: removed - _id: abc

And the following ddp messages

collection notifications: changed  - fields: { b: 2 }
collection notifications: changed  - unset: ['d']

And these DDP messages would be different for each user.

@radekmie
Copy link
Collaborator

radekmie commented Jul 9, 2021

We've experimented with this change in our apps and described our findings here: https://forums.meteor.com/t/performance-and-stability-problems-at-scale/56230. Most importantly: it really helps. If we could help with getting this one merged, do let me know.

@KoenLav
Copy link
Contributor Author

KoenLav commented Jul 9, 2021

We would still like to see this get merged too, hope someone finally finds/prioritizes some time to look into it from the Meteor side.

@StorytellerCZ
Copy link
Collaborator

@radekmie @KoenLav I will fix the merge conflict today. Then we need to make the tests pass and then we can merge it.

@StorytellerCZ
Copy link
Collaborator

@KoenLav can you please take a look on the failing tests? I would love to merge this soon.

@KoenLav
Copy link
Contributor Author

KoenLav commented Jul 27, 2021

@StorytellerCZ I'd like to, but I would definitely need some help (I don't have any experience with the tests, and quite a few seem to be failing).

What I wrote at the time was:

Tests are likely to fail because not all parts of the code that make use of DDP expect/understand message Arrays instead of singular messages; I will definitely need some help pinpointing these issues.

I took another look just now and I think:

  • The frontend tests just need to be rewritten to understand Arrays;
  • The backend tests check for things ending up in the oplog, but they don't understand this is being buffered now (I think);
  • The package catalog tests need to be rewritten to understand Array messages;

A lot of these problems could probably be resolved by NOT batching for tests, but then we would need to write some specific tests which do enable batching and also test the functionality.

@filipenevola
Copy link
Collaborator

Hi @KoenLav how much work it would be necessary to keep batching disabled by default?

In this case we could avoid all the possible breaking changes and also just new tests would be necessary as the current tests should still work as before.

It would be like this PR #11368 where the current behavior is preserved but new options are added when optimizations are necessary.

I would like to move this PR forward specially after the real case test from @radekmie but it would be also nice to avoid breaking changes and a way to keep the current behavior.

@KoenLav
Copy link
Contributor Author

KoenLav commented Aug 3, 2021

@filipenevola I've taken a quick look and added a global (per server instance, for all connected client) option:

DDPCommon.ALLOW_BUFFERING = false;

And a local (per client) option:

bufferedMessagesInterval: 0,

This should allow us to run the tests without buffering enabled and start adding tests which actually test buffering.

ps I think a few tests might still fail because currently the Crossbar implementation isn't 100% backwards compatible yet (as in it buffers for listeners as well, which causes listeners to only fire once the buffer is flushed). I'll look into this later.

@KoenLav
Copy link
Contributor Author

KoenLav commented Aug 4, 2021

The remaining failing tests are probably all caused because of changes to how we process the oplog.

I could try using the old logic when ALLOW_BUFFERING = false, but I don't think it will be as easy as disabling the batched DDP messages.

Probably won't have time to look into this for another week or two, so if anyone else could take a look (at either fixing one of the "broken" tests or using the old logic) that would be much appreciated.

Ps I think the tests rely on Meteor processing changes from the Oplog right away, so if we could check for the result after bufferInterval instead of immediately they would probably be fixed (fixing one test would probably mean fixing 80%+ of them).

@StorytellerCZ StorytellerCZ modified the milestones: Release 2.4, Release 2.5 Aug 5, 2021
@StorytellerCZ StorytellerCZ linked an issue Aug 19, 2021 that may be closed by this pull request
@filipenevola filipenevola modified the milestones: Release 2.5, Release 2.6 Sep 23, 2021
@filipenevola
Copy link
Collaborator

I'm closing this PR because we haven't heard anything from the original poster for a while and the tests are failing.

@ignl
Copy link

ignl commented Nov 20, 2021

This PR seemed so close maybe there is still some chance to reopen and @KoenLav could take a look when he has time? IMO any performance improvement to meteor is very valuable.

@KoenLav
Copy link
Contributor Author

KoenLav commented Nov 21, 2021

I definitely intend to take another look at it, at some point.

But I'd really appreciate some help on fixing the tests :)

@filipenevola filipenevola removed this from the Release 2.6 milestone Jan 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in-discussion We are still discussing how to solve or implement it pending-tests Tests are not passing, stuck or we need new tests Project:DDP Project:Mongo Driver
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Memory Leak/Fiber Bomb when EJSON type not declared [1.6.1] 100% CPU on Ubuntu server, caused by client?