Skip to content

Commit

Permalink
[FIXED] Fetch() could return immediately with a timeout error
Browse files Browse the repository at this point in the history
This would happen when pull requests would have filled the waiting
queue in the JetStream consumer and a 408 status was returned.

Resolves #809

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Sep 2, 2021
1 parent 51262a8 commit a975ef9
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 43 deletions.
59 changes: 29 additions & 30 deletions js.go
Expand Up @@ -2078,7 +2078,11 @@ func checkMsg(msg *Msg, checkSts bool) (usrMsg bool, err error) {
// 404 indicates that there are no messages.
err = errNoMessages
case reqTimeoutSts:
err = ErrTimeout
// Older servers may send a 408 when a request in the server was expired
// and interest is still found, which will be the case for our
// implementation. Regardless, ignore 408 errors, the caller will
// go back to wait for the next message.
err = nil
default:
err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
}
Expand All @@ -2090,6 +2094,9 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
if sub == nil {
return nil, ErrBadSubscription
}
if batch < 1 {
return nil, ErrInvalidArg
}

var o pullOpts
for _, opt := range opts {
Expand Down Expand Up @@ -2182,19 +2189,31 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
if err == nil && len(msgs) < batch {
// For batch real size of 1, it does not make sense to set no_wait in
// the request.
batchSize := batch - len(msgs)
noWait := batchSize > 1
nr := &nextRequest{Batch: batchSize, NoWait: noWait}
req, _ := json.Marshal(nr)
noWait := batch-len(msgs) > 1
var nr nextRequest

err = nc.PublishRequest(nms, rply, req)
for err == nil && len(msgs) < batch {
sendReq := func() error {
ttl -= time.Since(start)
if ttl < 0 {
ttl = 0
// At this point consider that we have timed-out
return context.DeadlineExceeded
}
// Make our request expiration a bit shorter than the current timeout.
expires := ttl
if ttl >= 20*time.Millisecond {
expires = ttl - 10*time.Millisecond
}

// Ask for next message and waits if there are no messages
nr.Batch = batch - len(msgs)
nr.Expires = expires
nr.NoWait = noWait
req, _ := json.Marshal(nr)
return nc.PublishRequest(nms, rply, req)
}

err = sendReq()
for err == nil && len(msgs) < batch {
// Ask for next message and wait if there are no messages
msg, err = sub.nextMsgWithContext(ctx, true, true)
if err == nil {
var usrMsg bool
Expand All @@ -2207,27 +2226,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// not collected any message, then resend request to
// wait this time.
noWait = false

ttl -= time.Since(start)
if ttl < 0 {
// At this point consider that we have timed-out
err = context.DeadlineExceeded
break
}

// Make our request expiration a bit shorter than the
// current timeout.
expires := ttl
if ttl >= 20*time.Millisecond {
expires = ttl - 10*time.Millisecond
}

nr.Batch = batch - len(msgs)
nr.Expires = expires
nr.NoWait = false
req, _ = json.Marshal(nr)

err = nc.PublishRequest(nms, rply, req)
err = sendReq()
}
}
}
Expand Down
58 changes: 58 additions & 0 deletions js_test.go
Expand Up @@ -771,3 +771,61 @@ func TestJetStreamFlowControlStalled(t *testing.T) {
t.Fatal("Library did not send FC")
}
}

func TestJetStreamExpiredPullRequests(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = js.AddStream(&StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

sub, err := js.PullSubscribe("foo", "bar", PullMaxWaiting(2))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
// Make sure that we reject batch < 1
if _, err := sub.Fetch(0); err == nil {
t.Fatal("Expected error, did not get one")
}
if _, err := sub.Fetch(-1); err == nil {
t.Fatal("Expected error, did not get one")
}

// Send 2 fetch requests
for i := 0; i < 2; i++ {
if _, err = sub.Fetch(1, MaxWait(15*time.Millisecond)); err == nil {
t.Fatalf("Expected error, got none")
}
}
// Wait before the above expire
time.Sleep(50 * time.Millisecond)
batches := []int{1, 10}
for _, bsz := range batches {
start := time.Now()
_, err = sub.Fetch(bsz, MaxWait(250*time.Millisecond))
dur := time.Since(start)
if err == nil || dur < 50*time.Millisecond {
t.Fatalf("Expected error and wait for 250ms, got err=%v and dur=%v", err, dur)
}
}
}
26 changes: 13 additions & 13 deletions test/js_test.go
Expand Up @@ -842,17 +842,17 @@ func TestJetStreamAckPending_Pull(t *testing.T) {
for time.Now().Before(timeout) {
ms, err := sub.Fetch(ackPendingLimit)
if err != nil || (ms != nil && len(ms) == 0) {
time.Sleep(50 * time.Millisecond)
continue
}

msgs = append(msgs, ms...)
if len(msgs) >= expected {
break
}
time.Sleep(10 * time.Millisecond)
}
if len(msgs) < expected {
t.Errorf("Expected %v, got %v", expected, pending)
t.Fatalf("Expected %v, got %v", expected, pending)
}

info, err := sub.ConsumerInfo()
Expand All @@ -863,37 +863,37 @@ func TestJetStreamAckPending_Pull(t *testing.T) {
got := info.NumRedelivered
expected = 3
if got < expected {
t.Errorf("Expected %v, got: %v", expected, got)
t.Fatalf("Expected %v, got: %v", expected, got)
}

got = info.NumAckPending
expected = 3
if got < expected {
t.Errorf("Expected %v, got: %v", expected, got)
t.Fatalf("Expected %v, got: %v", expected, got)
}

got = info.NumWaiting
expected = 0
if got != expected {
t.Errorf("Expected %v, got: %v", expected, got)
t.Fatalf("Expected %v, got: %v", expected, got)
}

got = int(info.NumPending)
expected = 0
if got != expected {
t.Errorf("Expected %v, got: %v", expected, got)
t.Fatalf("Expected %v, got: %v", expected, got)
}

got = info.Config.MaxAckPending
expected = 3
if got != expected {
t.Errorf("Expected %v, got %v", expected, pending)
t.Fatalf("Expected %v, got %v", expected, pending)
}

got = info.Config.MaxDeliver
expected = 5
if got != expected {
t.Errorf("Expected %v, got %v", expected, pending)
t.Fatalf("Expected %v, got %v", expected, pending)
}

acks := map[int]int{}
Expand All @@ -913,34 +913,34 @@ func TestJetStreamAckPending_Pull(t *testing.T) {

meta, err := m.Metadata()
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
acks[int(meta.Sequence.Stream)]++

if ackPending != 0 {
ackPending--
}
if int(meta.NumPending) != ackPending {
t.Errorf("Expected %v, got %v", ackPending, meta.NumPending)
t.Fatalf("Expected %v, got %v", ackPending, meta.NumPending)
}
}

got = len(acks)
expected = 3
if got != expected {
t.Errorf("Expected %v, got %v", expected, got)
t.Fatalf("Expected %v, got %v", expected, got)
}

expected = 5
for _, got := range acks {
if got != expected {
t.Errorf("Expected %v, got %v", expected, got)
t.Fatalf("Expected %v, got %v", expected, got)
}
}

_, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond))
if err != nats.ErrTimeout {
t.Errorf("Expected timeout, got: %v", err)
t.Fatalf("Expected timeout, got: %v", err)
}
}

Expand Down

0 comments on commit a975ef9

Please sign in to comment.