Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

js: handle 408 fetch requests pending status #967

Merged
merged 1 commit into from May 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 25 additions & 11 deletions js.go
Expand Up @@ -2414,12 +2414,20 @@ func PullMaxWaiting(n int) SubOpt {
})
}

var errNoMessages = errors.New("nats: no messages")
var (
// errNoMessages is an error that a Fetch request using no_wait can receive to signal
// that there are no more messages available.
errNoMessages = errors.New("nats: no messages")

// errRequestsPending is an error that represents a sub.Fetch requests that was using
// no_wait and expires time got discarded by the server.
errRequestsPending = errors.New("nats: requests pending")
)

// Returns if the given message is a user message or not, and if
// `checkSts` is true, returns appropriate error based on the
// content of the status (404, etc..)
func checkMsg(msg *Msg, checkSts bool) (usrMsg bool, err error) {
func checkMsg(msg *Msg, checkSts, isNoWait bool) (usrMsg bool, err error) {
// Assume user message
usrMsg = true

Expand Down Expand Up @@ -2448,11 +2456,17 @@ func checkMsg(msg *Msg, checkSts bool) (usrMsg bool, err error) {
// 404 indicates that there are no messages.
err = errNoMessages
case reqTimeoutSts:
// Older servers may send a 408 when a request in the server was expired
// and interest is still found, which will be the case for our
// implementation. Regardless, ignore 408 errors until receiving at least
// one message.
err = ErrTimeout
// In case of a fetch request with no wait request and expires time,
// need to skip 408 errors and retry.
if isNoWait {
err = errRequestsPending
} else {
// Older servers may send a 408 when a request in the server was expired
// and interest is still found, which will be the case for our
// implementation. Regardless, ignore 408 errors until receiving at least
// one message when making requests without no_wait.
err = ErrTimeout
}
default:
err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
}
Expand Down Expand Up @@ -2567,7 +2581,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// or status message, however, we don't care about values of status
// messages at this point in the Fetch() call, so checkMsg can't
// return an error.
if usrMsg, _ := checkMsg(msg, false); usrMsg {
if usrMsg, _ := checkMsg(msg, false, false); usrMsg {
msgs = append(msgs, msg)
}
}
Expand Down Expand Up @@ -2610,11 +2624,11 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
if err == nil {
var usrMsg bool

usrMsg, err = checkMsg(msg, true)
usrMsg, err = checkMsg(msg, true, noWait)
if err == nil && usrMsg {
msgs = append(msgs, msg)
} else if noWait && (err == errNoMessages) && len(msgs) == 0 {
// If we have a 404 for our "no_wait" request and have
} else if noWait && (err == errNoMessages || err == errRequestsPending) && len(msgs) == 0 {
// If we have a 404/408 for our "no_wait" request and have
// not collected any message, then resend request to
// wait this time.
noWait = false
Expand Down
142 changes: 142 additions & 0 deletions test/js_test.go
Expand Up @@ -4382,6 +4382,15 @@ func TestJetStream_ClusterMultipleSubscribe(t *testing.T) {
}
withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultiplePullSubscribe)
})

t.Run(fmt.Sprintf("psub n=%d r=%d multi fetch", n, r), func(t *testing.T) {
name := fmt.Sprintf("PFSUBN%d%d", n, r)
stream := &nats.StreamConfig{
Name: name,
Replicas: n,
}
withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleFetchPullSubscribe)
})
}
}
}
Expand Down Expand Up @@ -4543,6 +4552,139 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr
}
}

func testJetStream_ClusterMultipleFetchPullSubscribe(t *testing.T, subject string, srvs ...*jsServer) {
srv := srvs[0]
nc, js := jsClient(t, srv.Server)
defer nc.Close()

var wg sync.WaitGroup
ctx, done := context.WithTimeout(context.Background(), 5*time.Second)
defer done()

// Setup a number of subscriptions with different inboxes that will be
// fetching the messages in parallel.
nsubs := 4
subs := make([]*nats.Subscription, nsubs)
errCh := make(chan error, nsubs)
var queues sync.Map
for i := 0; i < nsubs; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
var sub *nats.Subscription
var err error
sub, err = js.PullSubscribe(subject, "shared")
if err != nil {
errCh <- err
} else {
subs[n] = sub
queues.Store(sub.Subject, make([]*nats.Msg, 0))
}
}(i)
}

// Publishing of messages happens after the subscriptions are ready.
// The subscribers will be fetching messages while these are being
// produced so sometimes there are not going to be messages available.
wg.Wait()
var (
total uint64 = 100
delivered uint64
batchSize = 2
)
go func() {
for i := 0; i < int(total); i++ {
js.Publish(subject, []byte(fmt.Sprintf("n:%v", i)))
time.Sleep(1 * time.Millisecond)
}
}()

ctx2, done2 := context.WithTimeout(ctx, 3*time.Second)
defer done2()

for _, psub := range subs {
if psub == nil {
continue
}
sub := psub
subject := sub.Subject
v, _ := queues.Load(sub.Subject)
queue := v.([]*nats.Msg)
go func() {
for {
select {
case <-ctx2.Done():
return
default:
}

if current := atomic.LoadUint64(&delivered); current >= total {
done2()
return
}

// Wait until all messages have been consumed.
for attempt := 0; attempt < 4; attempt++ {
recvd, err := sub.Fetch(batchSize, nats.MaxWait(1*time.Second))
if err != nil {
if err == nats.ErrConnectionClosed {
return
}
current := atomic.LoadUint64(&delivered)
if current >= total {
done2()
return
} else {
t.Logf("WARN: Timeout waiting for next message: %v", err)
}
continue
}
for _, msg := range recvd {
queue = append(queue, msg)
queues.Store(subject, queue)
}
atomic.AddUint64(&delivered, uint64(len(recvd)))
break
}
}
}()
}

// Wait until context is canceled after receiving all messages.
<-ctx2.Done()

if delivered < total {
t.Fatalf("Expected %v, got: %v", total, delivered)
}

select {
case <-ctx.Done():
case err := <-errCh:
if err != nil {
t.Fatalf("Unexpected error with multiple pull subscribers: %v", err)
}
}

var (
gotNoMessages bool
count = 0
)
queues.Range(func(k, v interface{}) bool {
msgs := v.([]*nats.Msg)
count += len(msgs)

if len(msgs) == 0 {
gotNoMessages = true
return false
}
return true
})

if gotNoMessages {
t.Error("Expected all pull subscribers to receive some messages")
}
}

func TestJetStream_ClusterReconnect(t *testing.T) {
t.Skip("This test need to be revisited")
n := 3
Expand Down