Skip to content

Commit

Permalink
js: handle 408 fetch requests pending status
Browse files Browse the repository at this point in the history
Skip 408 errors thrown to client no wait + expires requests

Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed May 3, 2022
1 parent 9c077d0 commit 5be4ec5
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 11 deletions.
36 changes: 25 additions & 11 deletions js.go
Expand Up @@ -2414,12 +2414,20 @@ func PullMaxWaiting(n int) SubOpt {
})
}

var errNoMessages = errors.New("nats: no messages")
var (
// errNoMessages is an error that a Fetch request using no_wait can receive to signal
// that there are no more messages available.
errNoMessages = errors.New("nats: no messages")

// errRequestsPending is an error that represents a sub.Fetch requests that was using
// no_wait and expires time got discarded by the server.
errRequestsPending = errors.New("nats: requests pending")
)

// Returns if the given message is a user message or not, and if
// `checkSts` is true, returns appropriate error based on the
// content of the status (404, etc..)
func checkMsg(msg *Msg, checkSts bool) (usrMsg bool, err error) {
func checkMsg(msg *Msg, checkSts, isNoWait bool) (usrMsg bool, err error) {
// Assume user message
usrMsg = true

Expand Down Expand Up @@ -2448,11 +2456,17 @@ func checkMsg(msg *Msg, checkSts bool) (usrMsg bool, err error) {
// 404 indicates that there are no messages.
err = errNoMessages
case reqTimeoutSts:
// Older servers may send a 408 when a request in the server was expired
// and interest is still found, which will be the case for our
// implementation. Regardless, ignore 408 errors until receiving at least
// one message.
err = ErrTimeout
// In case of a fetch request with no wait request and expires time,
// need to skip 408 errors and retry.
if isNoWait {
err = errRequestsPending
} else {
// Older servers may send a 408 when a request in the server was expired
// and interest is still found, which will be the case for our
// implementation. Regardless, ignore 408 errors until receiving at least
// one message when making requests without no_wait.
err = ErrTimeout
}
default:
err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
}
Expand Down Expand Up @@ -2567,7 +2581,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// or status message, however, we don't care about values of status
// messages at this point in the Fetch() call, so checkMsg can't
// return an error.
if usrMsg, _ := checkMsg(msg, false); usrMsg {
if usrMsg, _ := checkMsg(msg, false, false); usrMsg {
msgs = append(msgs, msg)
}
}
Expand Down Expand Up @@ -2610,11 +2624,11 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
if err == nil {
var usrMsg bool

usrMsg, err = checkMsg(msg, true)
usrMsg, err = checkMsg(msg, true, noWait)
if err == nil && usrMsg {
msgs = append(msgs, msg)
} else if noWait && (err == errNoMessages) && len(msgs) == 0 {
// If we have a 404 for our "no_wait" request and have
} else if noWait && (err == errNoMessages || err == errRequestsPending) && len(msgs) == 0 {
// If we have a 404/408 for our "no_wait" request and have
// not collected any message, then resend request to
// wait this time.
noWait = false
Expand Down
142 changes: 142 additions & 0 deletions test/js_test.go
Expand Up @@ -4382,6 +4382,15 @@ func TestJetStream_ClusterMultipleSubscribe(t *testing.T) {
}
withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultiplePullSubscribe)
})

t.Run(fmt.Sprintf("psub n=%d r=%d multi fetch", n, r), func(t *testing.T) {
name := fmt.Sprintf("PFSUBN%d%d", n, r)
stream := &nats.StreamConfig{
Name: name,
Replicas: n,
}
withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleFetchPullSubscribe)
})
}
}
}
Expand Down Expand Up @@ -4543,6 +4552,139 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr
}
}

func testJetStream_ClusterMultipleFetchPullSubscribe(t *testing.T, subject string, srvs ...*jsServer) {
srv := srvs[0]
nc, js := jsClient(t, srv.Server)
defer nc.Close()

var wg sync.WaitGroup
ctx, done := context.WithTimeout(context.Background(), 5*time.Second)
defer done()

// Setup a number of subscriptions with different inboxes that will be
// fetching the messages in parallel.
nsubs := 4
subs := make([]*nats.Subscription, nsubs)
errCh := make(chan error, nsubs)
var queues sync.Map
for i := 0; i < nsubs; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
var sub *nats.Subscription
var err error
sub, err = js.PullSubscribe(subject, "shared")
if err != nil {
errCh <- err
} else {
subs[n] = sub
queues.Store(sub.Subject, make([]*nats.Msg, 0))
}
}(i)
}

// Publishing of messages happens after the subscriptions are ready.
// The subscribers will be fetching messages while these are being
// produced so sometimes there are not going to be messages available.
wg.Wait()
var (
total uint64 = 100
delivered uint64
batchSize = 2
)
go func() {
for i := 0; i < int(total); i++ {
js.Publish(subject, []byte(fmt.Sprintf("n:%v", i)))
time.Sleep(1 * time.Millisecond)
}
}()

ctx2, done2 := context.WithTimeout(ctx, 3*time.Second)
defer done2()

for _, psub := range subs {
if psub == nil {
continue
}
sub := psub
subject := sub.Subject
v, _ := queues.Load(sub.Subject)
queue := v.([]*nats.Msg)
go func() {
for {
select {
case <-ctx2.Done():
return
default:
}

if current := atomic.LoadUint64(&delivered); current >= total {
done2()
return
}

// Wait until all messages have been consumed.
for attempt := 0; attempt < 4; attempt++ {
recvd, err := sub.Fetch(batchSize, nats.MaxWait(1*time.Second))
if err != nil {
if err == nats.ErrConnectionClosed {
return
}
current := atomic.LoadUint64(&delivered)
if current >= total {
done2()
return
} else {
t.Logf("WARN: Timeout waiting for next message: %v", err)
}
continue
}
for _, msg := range recvd {
queue = append(queue, msg)
queues.Store(subject, queue)
}
atomic.AddUint64(&delivered, uint64(len(recvd)))
break
}
}
}()
}

// Wait until context is canceled after receiving all messages.
<-ctx2.Done()

if delivered < total {
t.Fatalf("Expected %v, got: %v", total, delivered)
}

select {
case <-ctx.Done():
case err := <-errCh:
if err != nil {
t.Fatalf("Unexpected error with multiple pull subscribers: %v", err)
}
}

var (
gotNoMessages bool
count = 0
)
queues.Range(func(k, v interface{}) bool {
msgs := v.([]*nats.Msg)
count += len(msgs)

if len(msgs) == 0 {
gotNoMessages = true
return false
}
return true
})

if gotNoMessages {
t.Error("Expected all pull subscribers to receive some messages")
}
}

func TestJetStream_ClusterReconnect(t *testing.T) {
t.Skip("This test need to be revisited")
n := 3
Expand Down

0 comments on commit 5be4ec5

Please sign in to comment.