From 5be4ec526b431dfd68b2d2f6e2ccf3a01e7590cc Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Mon, 2 May 2022 14:45:37 -0700 Subject: [PATCH] js: handle 408 fetch requests pending status Skip 408 errors thrown to client no wait + expires requests Signed-off-by: Waldemar Quevedo --- js.go | 36 ++++++++---- test/js_test.go | 142 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+), 11 deletions(-) diff --git a/js.go b/js.go index 7001220e1..4013a975e 100644 --- a/js.go +++ b/js.go @@ -2414,12 +2414,20 @@ func PullMaxWaiting(n int) SubOpt { }) } -var errNoMessages = errors.New("nats: no messages") +var ( + // errNoMessages is an error that a Fetch request using no_wait can receive to signal + // that there are no more messages available. + errNoMessages = errors.New("nats: no messages") + + // errRequestsPending is an error that represents a sub.Fetch requests that was using + // no_wait and expires time got discarded by the server. + errRequestsPending = errors.New("nats: requests pending") +) // Returns if the given message is a user message or not, and if // `checkSts` is true, returns appropriate error based on the // content of the status (404, etc..) -func checkMsg(msg *Msg, checkSts bool) (usrMsg bool, err error) { +func checkMsg(msg *Msg, checkSts, isNoWait bool) (usrMsg bool, err error) { // Assume user message usrMsg = true @@ -2448,11 +2456,17 @@ func checkMsg(msg *Msg, checkSts bool) (usrMsg bool, err error) { // 404 indicates that there are no messages. err = errNoMessages case reqTimeoutSts: - // Older servers may send a 408 when a request in the server was expired - // and interest is still found, which will be the case for our - // implementation. Regardless, ignore 408 errors until receiving at least - // one message. - err = ErrTimeout + // In case of a fetch request with no wait request and expires time, + // need to skip 408 errors and retry. + if isNoWait { + err = errRequestsPending + } else { + // Older servers may send a 408 when a request in the server was expired + // and interest is still found, which will be the case for our + // implementation. Regardless, ignore 408 errors until receiving at least + // one message when making requests without no_wait. + err = ErrTimeout + } default: err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr)) } @@ -2567,7 +2581,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { // or status message, however, we don't care about values of status // messages at this point in the Fetch() call, so checkMsg can't // return an error. - if usrMsg, _ := checkMsg(msg, false); usrMsg { + if usrMsg, _ := checkMsg(msg, false, false); usrMsg { msgs = append(msgs, msg) } } @@ -2610,11 +2624,11 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { if err == nil { var usrMsg bool - usrMsg, err = checkMsg(msg, true) + usrMsg, err = checkMsg(msg, true, noWait) if err == nil && usrMsg { msgs = append(msgs, msg) - } else if noWait && (err == errNoMessages) && len(msgs) == 0 { - // If we have a 404 for our "no_wait" request and have + } else if noWait && (err == errNoMessages || err == errRequestsPending) && len(msgs) == 0 { + // If we have a 404/408 for our "no_wait" request and have // not collected any message, then resend request to // wait this time. noWait = false diff --git a/test/js_test.go b/test/js_test.go index 132bc2a8a..0ed0a5e1f 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -4382,6 +4382,15 @@ func TestJetStream_ClusterMultipleSubscribe(t *testing.T) { } withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultiplePullSubscribe) }) + + t.Run(fmt.Sprintf("psub n=%d r=%d multi fetch", n, r), func(t *testing.T) { + name := fmt.Sprintf("PFSUBN%d%d", n, r) + stream := &nats.StreamConfig{ + Name: name, + Replicas: n, + } + withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleFetchPullSubscribe) + }) } } } @@ -4543,6 +4552,139 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr } } +func testJetStream_ClusterMultipleFetchPullSubscribe(t *testing.T, subject string, srvs ...*jsServer) { + srv := srvs[0] + nc, js := jsClient(t, srv.Server) + defer nc.Close() + + var wg sync.WaitGroup + ctx, done := context.WithTimeout(context.Background(), 5*time.Second) + defer done() + + // Setup a number of subscriptions with different inboxes that will be + // fetching the messages in parallel. + nsubs := 4 + subs := make([]*nats.Subscription, nsubs) + errCh := make(chan error, nsubs) + var queues sync.Map + for i := 0; i < nsubs; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + var sub *nats.Subscription + var err error + sub, err = js.PullSubscribe(subject, "shared") + if err != nil { + errCh <- err + } else { + subs[n] = sub + queues.Store(sub.Subject, make([]*nats.Msg, 0)) + } + }(i) + } + + // Publishing of messages happens after the subscriptions are ready. + // The subscribers will be fetching messages while these are being + // produced so sometimes there are not going to be messages available. + wg.Wait() + var ( + total uint64 = 100 + delivered uint64 + batchSize = 2 + ) + go func() { + for i := 0; i < int(total); i++ { + js.Publish(subject, []byte(fmt.Sprintf("n:%v", i))) + time.Sleep(1 * time.Millisecond) + } + }() + + ctx2, done2 := context.WithTimeout(ctx, 3*time.Second) + defer done2() + + for _, psub := range subs { + if psub == nil { + continue + } + sub := psub + subject := sub.Subject + v, _ := queues.Load(sub.Subject) + queue := v.([]*nats.Msg) + go func() { + for { + select { + case <-ctx2.Done(): + return + default: + } + + if current := atomic.LoadUint64(&delivered); current >= total { + done2() + return + } + + // Wait until all messages have been consumed. + for attempt := 0; attempt < 4; attempt++ { + recvd, err := sub.Fetch(batchSize, nats.MaxWait(1*time.Second)) + if err != nil { + if err == nats.ErrConnectionClosed { + return + } + current := atomic.LoadUint64(&delivered) + if current >= total { + done2() + return + } else { + t.Logf("WARN: Timeout waiting for next message: %v", err) + } + continue + } + for _, msg := range recvd { + queue = append(queue, msg) + queues.Store(subject, queue) + } + atomic.AddUint64(&delivered, uint64(len(recvd))) + break + } + } + }() + } + + // Wait until context is canceled after receiving all messages. + <-ctx2.Done() + + if delivered < total { + t.Fatalf("Expected %v, got: %v", total, delivered) + } + + select { + case <-ctx.Done(): + case err := <-errCh: + if err != nil { + t.Fatalf("Unexpected error with multiple pull subscribers: %v", err) + } + } + + var ( + gotNoMessages bool + count = 0 + ) + queues.Range(func(k, v interface{}) bool { + msgs := v.([]*nats.Msg) + count += len(msgs) + + if len(msgs) == 0 { + gotNoMessages = true + return false + } + return true + }) + + if gotNoMessages { + t.Error("Expected all pull subscribers to receive some messages") + } +} + func TestJetStream_ClusterReconnect(t *testing.T) { t.Skip("This test need to be revisited") n := 3