Skip to content

Commit

Permalink
Merge pull request #447 from rittneje/feature/add-Done-method-to-Token
Browse files Browse the repository at this point in the history
Add Done() method to Token for use with select
  • Loading branch information
Al S-M committed Sep 18, 2020
2 parents 61824aa + 8d7529a commit 4b393de
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 6 deletions.
18 changes: 18 additions & 0 deletions messageids.go
Expand Up @@ -115,14 +115,23 @@ type DummyToken struct {
id uint16
}

// Wait implements the Token Wait method.
func (d *DummyToken) Wait() bool {
return true
}

// WaitTimeout implements the Token WaitTimeout method.
func (d *DummyToken) WaitTimeout(t time.Duration) bool {
return true
}

// Done implements the Token Done method.
func (d *DummyToken) Done() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}

func (d *DummyToken) flowComplete() {
ERROR.Printf("A lookup for token %d returned nil\n", d.id)
}
Expand All @@ -140,14 +149,23 @@ type PlaceHolderToken struct {
id uint16
}

// Wait implements the Token Wait method.
func (p *PlaceHolderToken) Wait() bool {
return true
}

// WaitTimeout implements the Token WaitTimeout method.
func (p *PlaceHolderToken) WaitTimeout(t time.Duration) bool {
return true
}

// Done implements the Token Done method.
func (p *PlaceHolderToken) Done() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}

func (p *PlaceHolderToken) flowComplete() {
}

Expand Down
29 changes: 23 additions & 6 deletions token.go
Expand Up @@ -31,8 +31,24 @@ type PacketAndToken struct {
// Token defines the interface for the tokens used to indicate when
// actions have completed.
type Token interface {
// Wait will wait indefinitely for the Token to complete, ie the Publish
// to be sent and confirmed receipt from the broker.
Wait() bool

// WaitTimeout takes a time.Duration to wait for the flow associated with the
// Token to complete, returns true if it returned before the timeout or
// returns false if the timeout occurred. In the case of a timeout the Token
// does not have an error set in case the caller wishes to wait again.
WaitTimeout(time.Duration) bool

// Done returns a channel that is closed when the flow associated
// with the Token completes. Clients should call Error after the
// channel is closed to check if the flow completed successfully.
//
// Done is provided for use in select statements. Simple use cases may
// use Wait or WaitTimeout.
Done() <-chan struct{}

Error() error
}

Expand All @@ -52,17 +68,13 @@ type baseToken struct {
err error
}

// Wait will wait indefinitely for the Token to complete, ie the Publish
// to be sent and confirmed receipt from the broker
// Wait implements the Token Wait method.
func (b *baseToken) Wait() bool {
<-b.complete
return true
}

// WaitTimeout takes a time.Duration to wait for the flow associated with the
// Token to complete, returns true if it returned before the timeout or
// returns false if the timeout occurred. In the case of a timeout the Token
// does not have an error set in case the caller wishes to wait again
// WaitTimeout implements the Token WaitTimeout method.
func (b *baseToken) WaitTimeout(d time.Duration) bool {
timer := time.NewTimer(d)
select {
Expand All @@ -77,6 +89,11 @@ func (b *baseToken) WaitTimeout(d time.Duration) bool {
return false
}

// Done implements the Token Done method.
func (b *baseToken) Done() <-chan struct{} {
return b.complete
}

func (b *baseToken) flowComplete() {
select {
case <-b.complete:
Expand Down

0 comments on commit 4b393de

Please sign in to comment.