-
Notifications
You must be signed in to change notification settings - Fork 664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
jetstream: fix datarace when using consumer #1279 #1291
Conversation
if sub.pending.msgCount < consumeOpts.ThresholdMessages || | ||
(sub.pending.byteCount < consumeOpts.ThresholdBytes && sub.consumeOpts.MaxBytes != 0) && | ||
sub.Lock() | ||
pending := sub.pending |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copying the value of pending because next we are using resetPendingMsgs
and it locks in the function itself so I can't create a lock defer unlock in addition we are already in a defer so it might seems overly complicated to defer again.
if err := sub.handleStatusMsg(msg, msgErr); err != nil { | ||
|
||
sub.Lock() | ||
err := sub.handleStatusMsg(msg, msgErr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't put the lock inside the handleStatusMsg
because it is used in Next
which is globally locked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, unfortunately Next()
has to be locked as a whole, which is a bit painful considering the logic of handling pendingMsgs
and status messages is common across Consume
and Next
- I'll probably clean that up for the next release, but for now that will work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A potential trick (to not refactor the whole logic) is to use TryLock
but the go team does not advise to use it because it means something is probably wrong.
https://pkg.go.dev/sync#Mutex.TryLock
. Note that while correct uses of TryLock do exist, they are rare, and use of TryLock is often a sign of a deeper problem in a particular use of mutexes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of using TryLock()
. I'd rather go through the code and try to clean up :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor nit, other than that looks great!
jetstream/pull.go
Outdated
@@ -642,7 +652,8 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) { | |||
} | |||
userMsg, err := checkMsg(msg) | |||
if err != nil { | |||
if !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, ErrNoMessages) && !errors.Is(err, ErrMaxBytesExceeded) { | |||
if !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, ErrNoMessages) && !errors.Is(err, | |||
ErrMaxBytesExceeded) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving a single parameter to new line looks a bit ugly - either leave everything in single line or split conditions to separate lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes looks like my formatter is doing something crazy
if err := sub.handleStatusMsg(msg, msgErr); err != nil { | ||
|
||
sub.Lock() | ||
err := sub.handleStatusMsg(msg, msgErr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, unfortunately Next()
has to be locked as a whole, which is a bit painful considering the logic of handling pendingMsgs
and status messages is common across Consume
and Next
- I'll probably clean that up for the next release, but for now that will work.
@piotrpio the PR looks good, I think we just need to restart the job because travis give us :
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexisvisco , yeah, that's not linked to your PR, I'll be fixing that tests. LGTM!
Hi, I opened the issue #1279 and give a try to fix it. It seems that some code using the pending field on the pull struct was not protected at certain point.
With the help of the stacktrace of the datarace I fixed it.