-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Heartbeats #239
base: v2
Are you sure you want to change the base?
Heartbeats #239
Conversation
N.B. Failing test on stopping the heartbeater gracefully. Signed-off-by: Alex Blease <ablease@vmware.com>
type DoneChan struct { | ||
C chan struct{} | ||
closed bool | ||
mutex sync.Mutex | ||
} | ||
|
||
func NewDoneChan() *DoneChan { | ||
return &DoneChan{C: make(chan struct{})} | ||
} | ||
|
||
// GracefulClose closes the DoneChan only if the Done chan is not already closed. | ||
func (dc *DoneChan) GracefulClose() { | ||
dc.mutex.Lock() | ||
defer dc.mutex.Unlock() | ||
|
||
if !dc.closed { | ||
close(dc.C) | ||
dc.closed = true | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to make DoneChan
unexported, since we don't expect users to use this, and its usage will be scoped to the stream
package.
The graceful close is fine, it works as expected and it should not panic. A simpler implementation, since we are already importing sync
, would be to use a sync.Once
in heartBeater.stop()
to close the channel. You won't need a mutex or a new type, because the Once
will take care of closing the channel just once.
Either way is fine by me, as long as we un-export the DoneChan
type.
type heartBeater struct { | ||
logger *slog.Logger | ||
client raw.Clienter | ||
tickDuration time.Duration | ||
ticker *time.Ticker | ||
done *DoneChan | ||
receiveCh <-chan *raw.Heartbeat | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The raw.Clienter
will be shared between consumers and producers, and it is not thread-safe. We will need a pointer to a mutex to ensure synchronised access to the raw client, and pass a shared mutex.
func (hb *heartBeater) send() { | ||
err := hb.client.SendHeartbeat() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to ensure that access to hb.client
is synchronised
@Zerpet I put in a basic graceful close mechanism for the Heartbeat done channel. I might still be susceptible to a race condition. The test passes without panicking at least!
I'll make another test case to call the locator operation for sending a heartbeat shortly (to pass the failing static check)
I had tried to incorporate the gleak leak detector. I see you tried as well, but I couldnt not get it to fail? Let me know what you think about having a gleak test.