Skip to content

Commit

Permalink
Change way to close update chan
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 8, 2022
1 parent e6ee279 commit b6d9b13
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 19 deletions.
17 changes: 5 additions & 12 deletions kv.go
Expand Up @@ -20,7 +20,6 @@ import (
"regexp"
"strconv"
"strings"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -596,7 +595,6 @@ func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {
type watcher struct {
updates chan KeyValueEntry
sub *Subscription
closed uint32
}

// Updates returns the interior channel.
Expand All @@ -607,20 +605,12 @@ func (w *watcher) Updates() <-chan KeyValueEntry {
return w.updates
}

func (w *watcher) close() {
if atomic.CompareAndSwapUint32(&w.closed, 0, 1) {
close(w.updates)
}
}

// Stop will unsubscribe from the watcher.
func (w *watcher) Stop() error {
if w == nil {
return nil
}
err := w.sub.Unsubscribe()
w.close()
return err
return w.sub.Unsubscribe()
}

// WatchAll watches all keys.
Expand Down Expand Up @@ -715,7 +705,10 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
}
// Track watcher.
sub.jsi.w = w

// Set us up to close when the waitForMessages func returns.
sub.pDone = func() {
close(w.updates)
}
// Check on pending count.
if sub.jsi.pending == 0 {
initDoneMarker = true
Expand Down
14 changes: 7 additions & 7 deletions nats.go
Expand Up @@ -587,6 +587,7 @@ type Subscription struct {
pHead *Msg
pTail *Msg
pCond *sync.Cond
pDone func()

// Pending stats, async subscriptions, high-speed etc.
pMsgs int
Expand Down Expand Up @@ -2697,7 +2698,13 @@ func (nc *Conn) waitForMsgs(s *Subscription) {
}
s.pHead = m.next
}
// Now check for pDone
done := s.pDone
s.mu.Unlock()

if done != nil {
done()
}
}

// Used for debugging and simulating loss for certain tests.
Expand Down Expand Up @@ -3946,13 +3953,6 @@ func (nc *Conn) removeSub(s *Subscription) {
if s.pCond != nil {
s.pCond.Broadcast()
}

// Check for watchers.
if jsi != nil && jsi.w != nil {
// Check on any watcher. If we have one close the update chan.
jsi.w.close()
jsi.w = nil
}
}

// SubscriptionType is the type of the Subscription.
Expand Down

0 comments on commit b6d9b13

Please sign in to comment.