Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Done() method to Token for use with select #447

Merged
merged 1 commit into from Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions messageids.go
Expand Up @@ -98,14 +98,23 @@ type DummyToken struct {
id uint16
}

// Wait implements the Token Wait method.
func (d *DummyToken) Wait() bool {
return true
}

// WaitTimeout implements the Token WaitTimeout method.
func (d *DummyToken) WaitTimeout(t time.Duration) bool {
return true
}

// Done implements the Token Done method.
func (d *DummyToken) Done() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}

func (d *DummyToken) flowComplete() {
ERROR.Printf("A lookup for token %d returned nil\n", d.id)
}
Expand All @@ -123,14 +132,23 @@ type PlaceHolderToken struct {
id uint16
}

// Wait implements the Token Wait method.
func (p *PlaceHolderToken) Wait() bool {
return true
}

// WaitTimeout implements the Token WaitTimeout method.
func (p *PlaceHolderToken) WaitTimeout(t time.Duration) bool {
return true
}

// Done implements the Token Done method.
func (p *PlaceHolderToken) Done() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}

func (p *PlaceHolderToken) flowComplete() {
}

Expand Down
29 changes: 23 additions & 6 deletions token.go
Expand Up @@ -31,8 +31,24 @@ type PacketAndToken struct {
// Token defines the interface for the tokens used to indicate when
// actions have completed.
type Token interface {
// Wait will wait indefinitely for the Token to complete, ie the Publish
// to be sent and confirmed receipt from the broker.
Wait() bool

// WaitTimeout takes a time.Duration to wait for the flow associated with the
// Token to complete, returns true if it returned before the timeout or
// returns false if the timeout occurred. In the case of a timeout the Token
// does not have an error set in case the caller wishes to wait again.
WaitTimeout(time.Duration) bool

// Done returns a channel that is closed when the flow associated
// with the Token completes. Clients should call Error after the
// channel is closed to check if the flow completed successfully.
//
// Done is provided for use in select statements. Simple use cases may
// use Wait or WaitTimeout.
Done() <-chan struct{}

Error() error
}

Expand All @@ -52,17 +68,13 @@ type baseToken struct {
err error
}

// Wait will wait indefinitely for the Token to complete, ie the Publish
// to be sent and confirmed receipt from the broker
// Wait implements the Token Wait method.
func (b *baseToken) Wait() bool {
<-b.complete
return true
}

// WaitTimeout takes a time.Duration to wait for the flow associated with the
// Token to complete, returns true if it returned before the timeout or
// returns false if the timeout occurred. In the case of a timeout the Token
// does not have an error set in case the caller wishes to wait again
// WaitTimeout implements the Token WaitTimeout method.
func (b *baseToken) WaitTimeout(d time.Duration) bool {
timer := time.NewTimer(d)
select {
Expand All @@ -77,6 +89,11 @@ func (b *baseToken) WaitTimeout(d time.Duration) bool {
return false
}

// Done implements the Token Done method.
func (b *baseToken) Done() <-chan struct{} {
return b.complete
}

func (b *baseToken) flowComplete() {
select {
case <-b.complete:
Expand Down