Skip to content

Commit

Permalink
js: handle 408 fetch requests pending status
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed May 2, 2022
1 parent 9c077d0 commit 2a30668
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 8 deletions.
32 changes: 24 additions & 8 deletions js.go
Expand Up @@ -2414,7 +2414,15 @@ func PullMaxWaiting(n int) SubOpt {
})
}

var errNoMessages = errors.New("nats: no messages")
var (
// errNoMessages is an error that a Fetch request using no_wait can receive to signal
// that there are no more messages available.
errNoMessages = errors.New("nats: no messages")

// errRequestsPending is an error that represents a sub.Fetch requests that was using
// no_wait and expires time got discarded by the server.
errRequestsPending = errors.New("nats: requests pending")
)

// Returns if the given message is a user message or not, and if
// `checkSts` is true, returns appropriate error based on the
Expand Down Expand Up @@ -2448,11 +2456,18 @@ func checkMsg(msg *Msg, checkSts bool) (usrMsg bool, err error) {
// 404 indicates that there are no messages.
err = errNoMessages
case reqTimeoutSts:
// Older servers may send a 408 when a request in the server was expired
// and interest is still found, which will be the case for our
// implementation. Regardless, ignore 408 errors until receiving at least
// one message.
err = ErrTimeout
// In case of a fetch request with no wait request and expires time,
// it will be a 408 error but with a Requests Pending description.
desc := msg.Header.Get(descrHdr)
if desc == reqPendingDesc {
err = errRequestsPending
} else {
// Older servers may send a 408 when a request in the server was expired
// and interest is still found, which will be the case for our
// implementation. Regardless, ignore 408 errors until receiving at least
// one message when making requests without no_wait.
err = ErrTimeout
}
default:
err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
}
Expand Down Expand Up @@ -2558,7 +2573,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// are no messages.
msg, err = sub.nextMsgWithContext(ctx, true, false)
if err != nil {
if err == errNoMessages {
// We skip both 404 and any type of 408 errors at this stage.
if err == errNoMessages || err == errRequestsPending {
err = nil
}
break
Expand Down Expand Up @@ -2613,7 +2629,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
usrMsg, err = checkMsg(msg, true)
if err == nil && usrMsg {
msgs = append(msgs, msg)
} else if noWait && (err == errNoMessages) && len(msgs) == 0 {
} else if noWait && (err == errNoMessages || err == errRequestsPending) && len(msgs) == 0 {
// If we have a 404 for our "no_wait" request and have
// not collected any message, then resend request to
// wait this time.
Expand Down
1 change: 1 addition & 0 deletions nats.go
Expand Up @@ -3317,6 +3317,7 @@ const (
noResponders = "503"
noMessagesSts = "404"
reqTimeoutSts = "408"
reqPendingDesc = "Requests Pending"
controlMsg = "100"
statusLen = 3 // e.g. 20x, 40x, 50x
)
Expand Down
143 changes: 143 additions & 0 deletions test/js_test.go
Expand Up @@ -4382,6 +4382,15 @@ func TestJetStream_ClusterMultipleSubscribe(t *testing.T) {
}
withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultiplePullSubscribe)
})

t.Run(fmt.Sprintf("psub n=%d r=%d multi fetch", n, r), func(t *testing.T) {
name := fmt.Sprintf("PFSUBN%d%d", n, r)
stream := &nats.StreamConfig{
Name: name,
Replicas: n,
}
withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleFetchPullSubscribe)
})
}
}
}
Expand Down Expand Up @@ -4543,6 +4552,140 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr
}
}

func testJetStream_ClusterMultipleFetchPullSubscribe(t *testing.T, subject string, srvs ...*jsServer) {
srv := srvs[0]
nc, js := jsClient(t, srv.Server)
defer nc.Close()

var wg sync.WaitGroup
ctx, done := context.WithTimeout(context.Background(), 5*time.Second)
defer done()

// Setup a number of subscriptions with different inboxes that will
// fetching the messages in parallel.
nsubs := 4
subs := make([]*nats.Subscription, nsubs)
errCh := make(chan error, nsubs)
var queues sync.Map
for i := 0; i < nsubs; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
var sub *nats.Subscription
var err error
sub, err = js.PullSubscribe(subject, "shared")
// fmt.Printf("%+v\n", sub.Subject)
if err != nil {
errCh <- err
} else {
subs[n] = sub
queues.Store(sub.Subject, make([]*nats.Msg, 0))
}
}(i)
}

// Publishing of messages happen after the subscriptions are ready.
// The subscribers will be fetching messages while these are being
// produced so sometimes there are not going to be messages available.
wg.Wait()
var (
total uint64 = 100
delivered uint64
batchSize = 2
)
go func() {
for i := 0; i < int(total); i++ {
js.Publish(subject, []byte(fmt.Sprintf("n:%v", i)))
time.Sleep(1 * time.Millisecond)
}
}()

ctx2, done2 := context.WithTimeout(ctx, 3*time.Second)
defer done2()

for i, psub := range subs {
if psub == nil {
continue
}
sub := psub
subject := sub.Subject
v, _ := queues.Load(sub.Subject)
queue := v.([]*nats.Msg)
go func() {
for {
select {
case <-ctx2.Done():
return
default:
}

if current := atomic.LoadUint64(&delivered); current >= total {
done2()
return
}

// Wait until all messages have been consumed.
for attempt := 0; attempt < 4; attempt++ {
recvd, err := sub.Fetch(batchSize, nats.MaxWait(1*time.Second))
if err != nil {
if err == nats.ErrConnectionClosed {
return
}
current := atomic.LoadUint64(&delivered)
if current >= total {
done2()
return
} else {
t.Logf("WARN: Timeout waiting for next message: %v", err)
}
continue
}
for _, msg := range recvd {
queue = append(queue, msg)
queues.Store(subject, queue)
}
atomic.AddUint64(&delivered, uint64(len(recvd)))
break
}
}
}()
}

// Wait until context is cancelled after receiving all messages.
<-ctx2.Done()

if delivered < total {
t.Fatalf("Expected %v, got: %v", total, delivered)
}

select {
case <-ctx.Done():
case err := <-errCh:
if err != nil {
t.Fatalf("Unexpected error with multiple pull subscribers: %v", err)
}
}

var (
gotNoMessages bool
count = 0
)
queues.Range(func(k, v interface{}) bool {
msgs := v.([]*nats.Msg)
count += len(msgs)

if len(msgs) == 0 {
gotNoMessages = true
return false
}
return true
})

if gotNoMessages {
t.Error("Expected all pull subscribers to receive some messages")
}
}

func TestJetStream_ClusterReconnect(t *testing.T) {
t.Skip("This test need to be revisited")
n := 3
Expand Down

0 comments on commit 2a30668

Please sign in to comment.