Skip to content

Commit

Permalink
Merge pull request #791 from nats-io/rework_pull_sub
Browse files Browse the repository at this point in the history
Reworked PullSubscribe implementation
  • Loading branch information
kozlovic committed Aug 10, 2021
2 parents be40aac + cfef5e6 commit d1955c8
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 312 deletions.
22 changes: 18 additions & 4 deletions context.go
Expand Up @@ -110,10 +110,7 @@ func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, dat
return s.NextMsgWithContext(ctx)
}

// NextMsgWithContext takes a context and returns the next message
// available to a synchronous subscriber, blocking until it is delivered
// or context gets canceled.
func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {
func (s *Subscription) nextMsgWithContext(ctx context.Context, pullSubInternal, waitIfNoMsg bool) (*Msg, error) {
if ctx == nil {
return nil, ErrInvalidContext
}
Expand All @@ -126,6 +123,11 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {

s.mu.Lock()
err := s.validateNextMsgState()
// Unless this is from an internal call, reject use of this API.
// Users should use Fetch() instead.
if err == nil && !pullSubInternal && s.jsi != nil && s.jsi.pull {
err = ErrTypeSubscription
}
if err != nil {
s.mu.Unlock()
return nil, err
Expand All @@ -150,6 +152,11 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {
return msg, nil
}
default:
// If internal and we don't want to wait, signal that there is no
// message in the internal queue.
if pullSubInternal && !waitIfNoMsg {
return nil, errNoMessages
}
}

select {
Expand All @@ -167,6 +174,13 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {
return msg, nil
}

// NextMsgWithContext takes a context and returns the next message
// available to a synchronous subscriber, blocking until it is delivered
// or context gets canceled.
func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {
return s.nextMsgWithContext(ctx, false, true)
}

// FlushWithContext will allow a context to control the duration
// of a Flush() call. This context should be non-nil and should
// have a deadline set. We will return an error if none is present.
Expand Down

0 comments on commit d1955c8

Please sign in to comment.