Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with sending messages in batches #196

Closed
dofrisec opened this issue Oct 27, 2021 · 2 comments
Closed

Issue with sending messages in batches #196

dofrisec opened this issue Oct 27, 2021 · 2 comments
Labels

Comments

@dofrisec
Copy link

Hi.

After updating from an older version to v3.7.0, I encountered an issue where messages will stop being sent in certain cases.
I've narrowed it down, and the issue first occurs with v3.5.2, where the batch sending is introduced.

I send around 4000 messages to the same channel in a loop, and await all the sendToQueue(...) calls via Promise.all. After successfully sending most of them, ~3500 or so, the processing just stops, and none of the remaining messages are sent.

If I then send more messages to the channel, the processing starts again and the remaining messages are sent.

Apparently, the logic in _publishQueuedMessages(...) has a flaw.
It looks something like this:

private _publishQueuedMessages(workerNumber: number): void {
        const channel = this._channel;
        if (
            !channel ||
            !this._shouldPublish() ||
            !this._working ||
            workerNumber !== this._workerNumber
        ) {
            // Can't publish anything right now...
            this._working = false;
            return;
        }

        try {
            // Send messages in batches of 1000 - don't want to starve the event loop.
            let sendsLeft = MAX_MESSAGES_PER_BATCH;
            while (this._channelHasRoom && this._messages.length > 0 && sendsLeft > 0) {

                // <snip --- send some messages ... ---  >

            }

            // If we didn't send all the messages, send some more...
            if (this._channelHasRoom && this._messages.length > 0) {
                setImmediate(() => this._publishQueuedMessages(workerNumber));
            }

            this._working = false;

            /* istanbul ignore next */
        } catch (err) {
            this._working = false;
            this.emit('error', err);
        }
    }

I'm guessing that this should trigger the sending of a new batch:
setImmediate(() => this._publishQueuedMessages(workerNumber));

but this._working = false; happens unconditionally, so when _publishQueuedMessages is called on the next tick, then the tests at the start of the method will cause it to return immediately without doing anything.

I've confirmed that doing this instead will make the issue go away:

            // If we didn't send all the messages, send some more...
            if (this._channelHasRoom && this._messages.length > 0) {
                setImmediate(() => this._publishQueuedMessages(workerNumber));
            } else {
                this._working = false;
            }

But I'm not sure if this is a good general fix.

@luddd3 luddd3 closed this as completed in 6a5b589 Dec 29, 2021
@luddd3
Copy link
Collaborator

luddd3 commented Dec 29, 2021

I think your fix is good. Thank you!

github-actions bot pushed a commit that referenced this issue Dec 29, 2021
## [3.8.1](v3.8.0...v3.8.1) (2021-12-29)

### Bug Fixes

* batch sending stops ([6a5b589](6a5b589)), closes [#196](#196)
@jwalton
Copy link
Owner

jwalton commented Dec 29, 2021

🎉 This issue has been resolved in version 3.8.1 🎉

The release is available on:

Your semantic-release bot 📦🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants