From c52f9c2b42ca3b47dd6904af0ea886b39bea69a2 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Mon, 2 May 2022 14:45:37 -0700 Subject: [PATCH] js: handle 408 fetch requests pending status Signed-off-by: Waldemar Quevedo --- js.go | 32 ++++++++--- nats.go | 1 + test/js_test.go | 143 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+), 8 deletions(-) diff --git a/js.go b/js.go index 7001220e1..f12b86b7f 100644 --- a/js.go +++ b/js.go @@ -2414,7 +2414,15 @@ func PullMaxWaiting(n int) SubOpt { }) } -var errNoMessages = errors.New("nats: no messages") +var ( + // errNoMessages is an error that a Fetch request using no_wait can receive to signal + // that there are no more messages available. + errNoMessages = errors.New("nats: no messages") + + // errRequestsPending is an error that represents a sub.Fetch requests that was using + // no_wait and expires time got discarded by the server. + errRequestsPending = errors.New("nats: requests pending") +) // Returns if the given message is a user message or not, and if // `checkSts` is true, returns appropriate error based on the @@ -2448,11 +2456,18 @@ func checkMsg(msg *Msg, checkSts bool) (usrMsg bool, err error) { // 404 indicates that there are no messages. err = errNoMessages case reqTimeoutSts: - // Older servers may send a 408 when a request in the server was expired - // and interest is still found, which will be the case for our - // implementation. Regardless, ignore 408 errors until receiving at least - // one message. - err = ErrTimeout + // In case of a fetch request with no wait request and expires time, + // it will be a 408 error but with a Requests Pending description. + desc := msg.Header.Get(descrHdr) + if desc == reqPendingDesc { + err = errRequestsPending + } else { + // Older servers may send a 408 when a request in the server was expired + // and interest is still found, which will be the case for our + // implementation. Regardless, ignore 408 errors until receiving at least + // one message when making requests without no_wait. + err = ErrTimeout + } default: err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr)) } @@ -2558,7 +2573,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { // are no messages. msg, err = sub.nextMsgWithContext(ctx, true, false) if err != nil { - if err == errNoMessages { + // We skip both 404 and any type of 408 errors at this stage. + if err == errNoMessages || err == errRequestsPending { err = nil } break @@ -2613,7 +2629,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { usrMsg, err = checkMsg(msg, true) if err == nil && usrMsg { msgs = append(msgs, msg) - } else if noWait && (err == errNoMessages) && len(msgs) == 0 { + } else if noWait && (err == errNoMessages || err == errRequestsPending) && len(msgs) == 0 { // If we have a 404 for our "no_wait" request and have // not collected any message, then resend request to // wait this time. diff --git a/nats.go b/nats.go index 81332aed9..952329de6 100644 --- a/nats.go +++ b/nats.go @@ -3317,6 +3317,7 @@ const ( noResponders = "503" noMessagesSts = "404" reqTimeoutSts = "408" + reqPendingDesc = "Requests Pending" controlMsg = "100" statusLen = 3 // e.g. 20x, 40x, 50x ) diff --git a/test/js_test.go b/test/js_test.go index 132bc2a8a..210a3b137 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -4382,6 +4382,15 @@ func TestJetStream_ClusterMultipleSubscribe(t *testing.T) { } withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultiplePullSubscribe) }) + + t.Run(fmt.Sprintf("psub n=%d r=%d multi fetch", n, r), func(t *testing.T) { + name := fmt.Sprintf("PFSUBN%d%d", n, r) + stream := &nats.StreamConfig{ + Name: name, + Replicas: n, + } + withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleFetchPullSubscribe) + }) } } } @@ -4543,6 +4552,140 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr } } +func testJetStream_ClusterMultipleFetchPullSubscribe(t *testing.T, subject string, srvs ...*jsServer) { + srv := srvs[0] + nc, js := jsClient(t, srv.Server) + defer nc.Close() + + var wg sync.WaitGroup + ctx, done := context.WithTimeout(context.Background(), 5*time.Second) + defer done() + + // Setup a number of subscriptions with different inboxes that will + // fetching the messages in parallel. + nsubs := 4 + subs := make([]*nats.Subscription, nsubs) + errCh := make(chan error, nsubs) + var queues sync.Map + for i := 0; i < nsubs; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + var sub *nats.Subscription + var err error + sub, err = js.PullSubscribe(subject, "shared") + // fmt.Printf("%+v\n", sub.Subject) + if err != nil { + errCh <- err + } else { + subs[n] = sub + queues.Store(sub.Subject, make([]*nats.Msg, 0)) + } + }(i) + } + + // Publishing of messages happen after the subscriptions are ready. + // The subscribers will be fetching messages while these are being + // produced so sometimes there are not going to be messages available. + wg.Wait() + var ( + total uint64 = 100 + delivered uint64 + batchSize = 2 + ) + go func() { + for i := 0; i < int(total); i++ { + js.Publish(subject, []byte(fmt.Sprintf("n:%v", i))) + time.Sleep(1 * time.Millisecond) + } + }() + + ctx2, done2 := context.WithTimeout(ctx, 3*time.Second) + defer done2() + + for _, psub := range subs { + if psub == nil { + continue + } + sub := psub + subject := sub.Subject + v, _ := queues.Load(sub.Subject) + queue := v.([]*nats.Msg) + go func() { + for { + select { + case <-ctx2.Done(): + return + default: + } + + if current := atomic.LoadUint64(&delivered); current >= total { + done2() + return + } + + // Wait until all messages have been consumed. + for attempt := 0; attempt < 4; attempt++ { + recvd, err := sub.Fetch(batchSize, nats.MaxWait(1*time.Second)) + if err != nil { + if err == nats.ErrConnectionClosed { + return + } + current := atomic.LoadUint64(&delivered) + if current >= total { + done2() + return + } else { + t.Logf("WARN: Timeout waiting for next message: %v", err) + } + continue + } + for _, msg := range recvd { + queue = append(queue, msg) + queues.Store(subject, queue) + } + atomic.AddUint64(&delivered, uint64(len(recvd))) + break + } + } + }() + } + + // Wait until context is cancelled after receiving all messages. + <-ctx2.Done() + + if delivered < total { + t.Fatalf("Expected %v, got: %v", total, delivered) + } + + select { + case <-ctx.Done(): + case err := <-errCh: + if err != nil { + t.Fatalf("Unexpected error with multiple pull subscribers: %v", err) + } + } + + var ( + gotNoMessages bool + count = 0 + ) + queues.Range(func(k, v interface{}) bool { + msgs := v.([]*nats.Msg) + count += len(msgs) + + if len(msgs) == 0 { + gotNoMessages = true + return false + } + return true + }) + + if gotNoMessages { + t.Error("Expected all pull subscribers to receive some messages") + } +} + func TestJetStream_ClusterReconnect(t *testing.T) { t.Skip("This test need to be revisited") n := 3