Skip to content

Commit

Permalink
Merge pull request #3942 from nats-io/jarema/fix-pull-consumer-timeout
Browse files Browse the repository at this point in the history
Fix Pull Consumer not sending request timeout
  • Loading branch information
Jarema committed Mar 3, 2023
2 parents 6db06d1 + df282a2 commit c1af585
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 0 deletions.
11 changes: 11 additions & 0 deletions server/consumer.go
Expand Up @@ -2781,6 +2781,17 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
} else if o.srv.gateway.enabled && o.srv.hasGatewayInterest(wr.acc.Name, wr.interest) {
return o.waiting.pop()
}
} else {
// We do check for expiration in `processWaiting`, but it is possible to hit the expiry here, and not there.
hdr := []byte(fmt.Sprintf("NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
o.waiting.removeCurrent()
if o.node != nil {
o.removeClusterPendingRequest(wr.reply)
}
wr.recycle()
continue

}
if wr.interest != wr.reply {
const intExpT = "NATS/1.0 408 Interest Expired\r\n%s: %d\r\n%s: %d\r\n\r\n"
Expand Down
57 changes: 57 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -16732,6 +16732,63 @@ func TestJetStreamDisabledHealthz(t *testing.T) {
t.Fatalf("Expected healthz to return error if JetStream is disabled, got status: %s", hs.Status)
}

func TestJetStreamPullTimeout(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "pr",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

const numMessages = 1000
// Send messages in small intervals.
go func() {
for i := 0; i < numMessages; i++ {
time.Sleep(time.Millisecond * 10)
sendStreamMsg(t, nc, "TEST", "data")
}
}()

// Prepare manual Pull Request.
req := &JSApiConsumerGetNextRequest{Batch: 200, NoWait: false, Expires: time.Millisecond * 100}
jreq, _ := json.Marshal(req)

subj := fmt.Sprintf(JSApiRequestNextT, "TEST", "pr")
reply := "_pr_"
var got atomic.Int32
nc.PublishRequest(subj, reply, jreq)

// Manually subscribe to inbox subject and send new request only if we get `408 Request Timeout`.
sub, _ := nc.Subscribe(reply, func(msg *nats.Msg) {
if msg.Header.Get("Status") == "408" && msg.Header.Get("Description") == "Request Timeout" {
nc.PublishRequest(subj, reply, jreq)
nc.Flush()
} else {
got.Add(1)
msg.Ack()
}
})
defer sub.Unsubscribe()

// Check if we're not stuck.
checkFor(t, time.Second*30, time.Second*1, func() error {
if got.Load() < int32(numMessages) {
return fmt.Errorf("expected %d messages", numMessages)
}
return nil
})
}

func TestJetStreamPullMaxBytes(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
Expand Down

0 comments on commit c1af585

Please sign in to comment.