Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heartbeats #239

Open
wants to merge 3 commits into
base: v2
Choose a base branch
from
Open

Heartbeats #239

wants to merge 3 commits into from

Conversation

ablease
Copy link

@ablease ablease commented Sep 28, 2023

@Zerpet I put in a basic graceful close mechanism for the Heartbeat done channel. I might still be susceptible to a race condition. The test passes without panicking at least!

I'll make another test case to call the locator operation for sending a heartbeat shortly (to pass the failing static check)

I had tried to incorporate the gleak leak detector. I see you tried as well, but I couldnt not get it to fail? Let me know what you think about having a gleak test.

N.B. Failing test on stopping the heartbeater gracefully.

Signed-off-by: Alex Blease <ablease@vmware.com>
@ablease ablease marked this pull request as ready for review October 10, 2023 09:47
Comment on lines +19 to +38
type DoneChan struct {
C chan struct{}
closed bool
mutex sync.Mutex
}

func NewDoneChan() *DoneChan {
return &DoneChan{C: make(chan struct{})}
}

// GracefulClose closes the DoneChan only if the Done chan is not already closed.
func (dc *DoneChan) GracefulClose() {
dc.mutex.Lock()
defer dc.mutex.Unlock()

if !dc.closed {
close(dc.C)
dc.closed = true
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to make DoneChan unexported, since we don't expect users to use this, and its usage will be scoped to the stream package.

The graceful close is fine, it works as expected and it should not panic. A simpler implementation, since we are already importing sync, would be to use a sync.Once in heartBeater.stop() to close the channel. You won't need a mutex or a new type, because the Once will take care of closing the channel just once.

Either way is fine by me, as long as we un-export the DoneChan type.

Comment on lines +10 to +17
type heartBeater struct {
logger *slog.Logger
client raw.Clienter
tickDuration time.Duration
ticker *time.Ticker
done *DoneChan
receiveCh <-chan *raw.Heartbeat
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The raw.Clienter will be shared between consumers and producers, and it is not thread-safe. We will need a pointer to a mutex to ensure synchronised access to the raw client, and pass a shared mutex.

Comment on lines +80 to +81
func (hb *heartBeater) send() {
err := hb.client.SendHeartbeat()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to ensure that access to hb.client is synchronised

@Zerpet Zerpet self-assigned this Oct 17, 2023
@Zerpet Zerpet added this to the 2.0 milestone Oct 17, 2023
@Zerpet Zerpet added the v2.x Applicable to v2.x issues label Oct 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
v2.x Applicable to v2.x issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants