Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jetstream: fix datarace when using consumer #1279 #1291

Merged
merged 4 commits into from
Jun 7, 2023

Conversation

alexisvisco
Copy link
Contributor

Hi, I opened the issue #1279 and give a try to fix it. It seems that some code using the pending field on the pull struct was not protected at certain point.

With the help of the stacktrace of the datarace I fixed it.

if sub.pending.msgCount < consumeOpts.ThresholdMessages ||
(sub.pending.byteCount < consumeOpts.ThresholdBytes && sub.consumeOpts.MaxBytes != 0) &&
sub.Lock()
pending := sub.pending
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying the value of pending because next we are using resetPendingMsgs and it locks in the function itself so I can't create a lock defer unlock in addition we are already in a defer so it might seems overly complicated to defer again.

if err := sub.handleStatusMsg(msg, msgErr); err != nil {

sub.Lock()
err := sub.handleStatusMsg(msg, msgErr)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't put the lock inside the handleStatusMsg because it is used in Next which is globally locked.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unfortunately Next() has to be locked as a whole, which is a bit painful considering the logic of handling pendingMsgs and status messages is common across Consume and Next - I'll probably clean that up for the next release, but for now that will work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A potential trick (to not refactor the whole logic) is to use TryLock but the go team does not advise to use it because it means something is probably wrong.

https://pkg.go.dev/sync#Mutex.TryLock
. Note that while correct uses of TryLock do exist, they are rare, and use of TryLock is often a sign of a deeper problem in a particular use of mutexes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of using TryLock(). I'd rather go through the code and try to clean up :)

@bruth bruth requested a review from piotrpio June 5, 2023 10:42
Copy link
Collaborator

@piotrpio piotrpio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor nit, other than that looks great!

@@ -642,7 +652,8 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
}
userMsg, err := checkMsg(msg)
if err != nil {
if !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, ErrNoMessages) && !errors.Is(err, ErrMaxBytesExceeded) {
if !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, ErrNoMessages) && !errors.Is(err,
ErrMaxBytesExceeded) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving a single parameter to new line looks a bit ugly - either leave everything in single line or split conditions to separate lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes looks like my formatter is doing something crazy

if err := sub.handleStatusMsg(msg, msgErr); err != nil {

sub.Lock()
err := sub.handleStatusMsg(msg, msgErr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unfortunately Next() has to be locked as a whole, which is a bit painful considering the logic of handling pendingMsgs and status messages is common across Consume and Next - I'll probably clean that up for the next release, but for now that will work.

@alexisvisco
Copy link
Contributor Author

@piotrpio the PR looks good, I think we just need to restart the job because travis give us :

Unable to start NATS Server in Go Routine [recovered]
	panic: Unable to start NATS Server in Go Routine
	```

Copy link
Collaborator

@piotrpio piotrpio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexisvisco , yeah, that's not linked to your PR, I'll be fixing that tests. LGTM!

@piotrpio piotrpio merged commit 0d68857 into nats-io:main Jun 7, 2023
1 check failed
@piotrpio piotrpio mentioned this pull request Jun 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants