Skip to content

Commit

Permalink
Merge pull request #3822 from nats-io/jarema/add-batch-completed
Browse files Browse the repository at this point in the history
Add batch completed status to Pull Consumers
  • Loading branch information
Jarema committed Jan 26, 2023
2 parents c2d3d9c + 34be180 commit 69aa55a
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 2 deletions.
9 changes: 9 additions & 0 deletions server/consumer.go
Expand Up @@ -38,6 +38,9 @@ const (
JSPullRequestPendingBytes = "Nats-Pending-Bytes"
)

// Headers sent when batch size was completed, but there were remaining bytes.
const JsPullRequestRemainingBytesT = "NATS/1.0 409 Batch Completed\r\n%s: %d\r\n%s: %d\r\n\r\n"

type ConsumerInfo struct {
Stream string `json:"stream_name"`
Name string `json:"name"`
Expand Down Expand Up @@ -3215,6 +3218,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
ackReply string
delay time.Duration
sz int
wrn, wrb int
)
o.mu.Lock()
// consumer is closed when mset is set to nil.
Expand Down Expand Up @@ -3268,6 +3272,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
if o.isPushMode() {
dsubj = o.dsubj
} else if wr := o.nextWaiting(sz); wr != nil {
wrn, wrb = wr.n, wr.b
dsubj = wr.reply
if done := wr.recycleIfDone(); done && o.node != nil {
o.removeClusterPendingRequest(dsubj)
Expand Down Expand Up @@ -3321,6 +3326,10 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// Do actual delivery.
o.deliverMsg(dsubj, ackReply, pmsg, dc, rp)

// If given request fulfilled batch size, but there are still pending bytes, send information about it.
if wrn <= 0 && wrb > 0 {
o.outq.send(newJSPubMsg(dsubj, _EMPTY_, _EMPTY_, []byte(fmt.Sprintf(JsPullRequestRemainingBytesT, JSPullRequestPendingMsgs, wrn, JSPullRequestPendingBytes, wrb)), nil, nil, 0))
}
// Reset our idle heartbeat timer if set.
if hb != nil {
hb.Reset(hbd)
Expand Down
71 changes: 69 additions & 2 deletions server/jetstream_test.go
Expand Up @@ -16594,25 +16594,39 @@ func TestJetStreamPullMaxBytes(t *testing.T) {
req = &JSApiConsumerGetNextRequest{Batch: 1, MaxBytes: 10_000_000, NoWait: true}
jreq, _ = json.Marshal(req)
nc.PublishRequest(subj, reply, jreq)
checkSubsPending(t, sub, 1)
// we expect two messages, as the second one should be `Batch Completed` status.
checkSubsPending(t, sub, 2)

// first one is message from the stream.
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_True(t, len(m.Data) == dsz)
require_True(t, len(m.Header) == 0)
// second one is the status.
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
if v := m.Header.Get("Description"); v != "Batch Completed" {
t.Fatalf("Expected Batch Completed, got: %s", v)
}
checkSubsPending(t, sub, 0)

// Same but with batch > 1
req = &JSApiConsumerGetNextRequest{Batch: 5, MaxBytes: 10_000_000, NoWait: true}
jreq, _ = json.Marshal(req)
nc.PublishRequest(subj, reply, jreq)
checkSubsPending(t, sub, 5)
// 6, not 5, as 6th is the status.
checkSubsPending(t, sub, 6)
for i := 0; i < 5; i++ {
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_True(t, len(m.Data) == dsz)
require_True(t, len(m.Header) == 0)
}
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
if v := m.Header.Get("Description"); v != "Batch Completed" {
t.Fatalf("Expected Batch Completed, got: %s", v)
}
checkSubsPending(t, sub, 0)

// Now ask for large batch but make sure we are limited by batch size.
Expand Down Expand Up @@ -19119,3 +19133,56 @@ func TestJetStreamMsgBlkFailOnKernelFault(t *testing.T) {
friendlyBytes(si.Config.MaxBytes), friendlyBytes(int64(si.State.Bytes)))
}
}

func TestJetStreamPullConsumerBatchCompleted(t *testing.T) {

s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

msgSize := 128
msg := make([]byte, msgSize)
rand.Read(msg)

for i := 0; i < 10; i++ {
_, err := js.Publish("foo", msg)
require_NoError(t, err)
}

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dur",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

req := JSApiConsumerGetNextRequest{Batch: 0, MaxBytes: 1024, Expires: 250 * time.Millisecond}

reqb, _ := json.Marshal(req)
sub := natsSubSync(t, nc, nats.NewInbox())
err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
require_NoError(t, err)

// Expect first message to arrive normally.
_, err = sub.NextMsg(time.Second * 1)
require_NoError(t, err)

// Second message should be info that batch is complete, but there were pending bytes.
pullMsg, err := sub.NextMsg(time.Second * 1)
require_NoError(t, err)

if v := pullMsg.Header.Get("Status"); v != "409" {
t.Fatalf("Expected 409, got: %s", v)
}
if v := pullMsg.Header.Get("Description"); v != "Batch Completed" {
t.Fatalf("Expected Batch Completed, got: %s", v)
}

}

0 comments on commit 69aa55a

Please sign in to comment.