From b6d9b13ba5d45aded07a872b4fdfd59eec59a060 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 7 Feb 2022 16:27:31 -0800 Subject: [PATCH] Change way to close update chan Signed-off-by: Derek Collison --- kv.go | 17 +++++------------ nats.go | 14 +++++++------- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/kv.go b/kv.go index 76e2ebf2f..3afef76cb 100644 --- a/kv.go +++ b/kv.go @@ -20,7 +20,6 @@ import ( "regexp" "strconv" "strings" - "sync/atomic" "time" ) @@ -596,7 +595,6 @@ func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) { type watcher struct { updates chan KeyValueEntry sub *Subscription - closed uint32 } // Updates returns the interior channel. @@ -607,20 +605,12 @@ func (w *watcher) Updates() <-chan KeyValueEntry { return w.updates } -func (w *watcher) close() { - if atomic.CompareAndSwapUint32(&w.closed, 0, 1) { - close(w.updates) - } -} - // Stop will unsubscribe from the watcher. func (w *watcher) Stop() error { if w == nil { return nil } - err := w.sub.Unsubscribe() - w.close() - return err + return w.sub.Unsubscribe() } // WatchAll watches all keys. @@ -715,7 +705,10 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { } // Track watcher. sub.jsi.w = w - + // Set us up to close when the waitForMessages func returns. + sub.pDone = func() { + close(w.updates) + } // Check on pending count. if sub.jsi.pending == 0 { initDoneMarker = true diff --git a/nats.go b/nats.go index 877427b76..91208d06d 100644 --- a/nats.go +++ b/nats.go @@ -587,6 +587,7 @@ type Subscription struct { pHead *Msg pTail *Msg pCond *sync.Cond + pDone func() // Pending stats, async subscriptions, high-speed etc. pMsgs int @@ -2697,7 +2698,13 @@ func (nc *Conn) waitForMsgs(s *Subscription) { } s.pHead = m.next } + // Now check for pDone + done := s.pDone s.mu.Unlock() + + if done != nil { + done() + } } // Used for debugging and simulating loss for certain tests. @@ -3946,13 +3953,6 @@ func (nc *Conn) removeSub(s *Subscription) { if s.pCond != nil { s.pCond.Broadcast() } - - // Check for watchers. - if jsi != nil && jsi.w != nil { - // Check on any watcher. If we have one close the update chan. - jsi.w.close() - jsi.w = nil - } } // SubscriptionType is the type of the Subscription.