Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch completed status to Pull Consumers #3822

Merged
merged 1 commit into from Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions server/consumer.go
Expand Up @@ -38,6 +38,9 @@ const (
JSPullRequestPendingBytes = "Nats-Pending-Bytes"
)

// Headers sent when batch size was completed, but there were remaining bytes.
const JsPullRequestRemainingBytesT = "NATS/1.0 409 Batch Completed\r\n%s: %d\r\n%s: %d\r\n\r\n"

type ConsumerInfo struct {
Stream string `json:"stream_name"`
Name string `json:"name"`
Expand Down Expand Up @@ -3224,6 +3227,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
ackReply string
delay time.Duration
sz int
wrn, wrb int
)
o.mu.Lock()
// consumer is closed when mset is set to nil.
Expand Down Expand Up @@ -3277,6 +3281,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
if o.isPushMode() {
dsubj = o.dsubj
} else if wr := o.nextWaiting(sz); wr != nil {
wrn, wrb = wr.n, wr.b
dsubj = wr.reply
if done := wr.recycleIfDone(); done && o.node != nil {
o.removeClusterPendingRequest(dsubj)
Expand Down Expand Up @@ -3330,6 +3335,10 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// Do actual delivery.
o.deliverMsg(dsubj, ackReply, pmsg, dc, rp)

// If given request fulfilled batch size, but there are still pending bytes, send information about it.
if wrn <= 0 && wrb > 0 {
o.outq.send(newJSPubMsg(dsubj, _EMPTY_, _EMPTY_, []byte(fmt.Sprintf(JsPullRequestRemainingBytesT, JSPullRequestPendingMsgs, wrn, JSPullRequestPendingBytes, wrb)), nil, nil, 0))
}
// Reset our idle heartbeat timer if set.
if hb != nil {
hb.Reset(hbd)
Expand Down
71 changes: 69 additions & 2 deletions server/jetstream_test.go
Expand Up @@ -16808,25 +16808,39 @@ func TestJetStreamPullMaxBytes(t *testing.T) {
req = &JSApiConsumerGetNextRequest{Batch: 1, MaxBytes: 10_000_000, NoWait: true}
jreq, _ = json.Marshal(req)
nc.PublishRequest(subj, reply, jreq)
checkSubsPending(t, sub, 1)
// we expect two messages, as the second one should be `Batch Completed` status.
checkSubsPending(t, sub, 2)

// first one is message from the stream.
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_True(t, len(m.Data) == dsz)
require_True(t, len(m.Header) == 0)
// second one is the status.
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
if v := m.Header.Get("Description"); v != "Batch Completed" {
t.Fatalf("Expected Batch Completed, got: %s", v)
}
checkSubsPending(t, sub, 0)

// Same but with batch > 1
req = &JSApiConsumerGetNextRequest{Batch: 5, MaxBytes: 10_000_000, NoWait: true}
jreq, _ = json.Marshal(req)
nc.PublishRequest(subj, reply, jreq)
checkSubsPending(t, sub, 5)
// 6, not 5, as 6th is the status.
checkSubsPending(t, sub, 6)
for i := 0; i < 5; i++ {
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_True(t, len(m.Data) == dsz)
require_True(t, len(m.Header) == 0)
}
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
if v := m.Header.Get("Description"); v != "Batch Completed" {
t.Fatalf("Expected Batch Completed, got: %s", v)
}
checkSubsPending(t, sub, 0)

// Now ask for large batch but make sure we are limited by batch size.
Expand Down Expand Up @@ -19333,3 +19347,56 @@ func TestJetStreamMsgBlkFailOnKernelFault(t *testing.T) {
friendlyBytes(si.Config.MaxBytes), friendlyBytes(int64(si.State.Bytes)))
}
}

func TestJetStreamPullConsumerBatchCompleted(t *testing.T) {

s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

msgSize := 128
msg := make([]byte, msgSize)
rand.Read(msg)

for i := 0; i < 10; i++ {
_, err := js.Publish("foo", msg)
require_NoError(t, err)
}

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dur",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

req := JSApiConsumerGetNextRequest{Batch: 0, MaxBytes: 1024, Expires: 250 * time.Millisecond}

reqb, _ := json.Marshal(req)
sub := natsSubSync(t, nc, nats.NewInbox())
err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
require_NoError(t, err)

// Expect first message to arrive normally.
_, err = sub.NextMsg(time.Second * 1)
require_NoError(t, err)

// Second message should be info that batch is complete, but there were pending bytes.
pullMsg, err := sub.NextMsg(time.Second * 1)
require_NoError(t, err)

if v := pullMsg.Header.Get("Status"); v != "409" {
t.Fatalf("Expected 409, got: %s", v)
}
if v := pullMsg.Header.Get("Description"); v != "Batch Completed" {
t.Fatalf("Expected Batch Completed, got: %s", v)
}

}