Skip to content

Commit

Permalink
Do not nil chan, remove mutex
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 7, 2022
1 parent 16c1c76 commit 3987a74
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 29 deletions.
34 changes: 11 additions & 23 deletions kv.go
Expand Up @@ -20,7 +20,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -594,41 +594,37 @@ func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {

// Implementation for Watch
type watcher struct {
mu sync.Mutex
updates chan KeyValueEntry
sub *Subscription
closed uint32
}

// Updates returns the interior channel.
func (w *watcher) Updates() <-chan KeyValueEntry {
if w == nil {
return nil
}
w.mu.Lock()
defer w.mu.Unlock()
return w.updates
}

// close the update chan.
func (w *watcher) close() {
if w == nil {
return
}
w.mu.Lock()
if w.updates != nil {
if atomic.CompareAndSwapUint32(&w.closed, 0, 1) {
close(w.updates)
w.updates = nil
}
w.mu.Unlock()
}

func (w *watcher) isClosed() bool {
return atomic.LoadUint32(&w.closed) > 0
}

// Stop will unsubscribe from the watcher.
func (w *watcher) Stop() error {
if w == nil {
return nil
}
err := w.sub.Unsubscribe()
w.close()
return w.sub.Unsubscribe()
return err
}

// WatchAll watches all keys.
Expand Down Expand Up @@ -690,11 +686,7 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
delta: delta,
op: op,
}
w.mu.Lock()
if w.updates != nil {
w.updates <- entry
}
w.mu.Unlock()
w.updates <- entry
}
// Check if done and initial values.
if !initDoneMarker {
Expand All @@ -705,11 +697,7 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
}
if received > initPending || delta == 0 {
initDoneMarker = true
w.mu.Lock()
if w.updates != nil {
w.updates <- nil
}
w.mu.Unlock()
w.updates <- nil
}
}
}
Expand Down
15 changes: 9 additions & 6 deletions nats.go
Expand Up @@ -3929,7 +3929,8 @@ func (nc *Conn) removeSub(s *Subscription) {
s.mch = nil

// If JS subscription then stop HB timer.
if jsi := s.jsi; jsi != nil {
jsi := s.jsi
if jsi != nil {
if jsi.hbc != nil {
jsi.hbc.Stop()
jsi.hbc = nil
Expand All @@ -3938,18 +3939,20 @@ func (nc *Conn) removeSub(s *Subscription) {
jsi.csfct.Stop()
jsi.csfct = nil
}
// Check on any watcher. If we have one close the update chan.
if jsi.w != nil {
jsi.w.close()
jsi.w = nil
}
}

// Mark as invalid
s.closed = true
if s.pCond != nil {
s.pCond.Broadcast()
}

// Check for watchers.
if jsi != nil && jsi.w != nil {
// Check on any watcher. If we have one close the update chan.
jsi.w.close()
jsi.w = nil
}
}

// SubscriptionType is the type of the Subscription.
Expand Down

0 comments on commit 3987a74

Please sign in to comment.