Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Direct Gets by subject #1030

Merged
merged 6 commits into from Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4
github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
4 changes: 2 additions & 2 deletions go_test.sum
Expand Up @@ -20,8 +20,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4 h1:hAhXb/iuQNPZe9y0wFift4hkiquMlNF5WuMhpoMlUUM=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116 h1:NoZ5jkLgMNijnDh96QENq4M06AF34GXlvaYtHGXm/Jk=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg=
github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
3 changes: 3 additions & 0 deletions js.go
Expand Up @@ -169,6 +169,9 @@ const (
// apiMsgGetT is the endpoint to perform a direct get of a message.
apiDirectMsgGetT = "DIRECT.GET.%s"

// apiDirectMsgGetLastBySubjectT is the endpoint to perform a direct get of a message by subject.
apiDirectMsgGetLastBySubjectT = "DIRECT.GET.%s.%s"

// apiMsgDeleteT is the endpoint to remove a message.
apiMsgDeleteT = "STREAM.MSG.DELETE.%s"

Expand Down
14 changes: 14 additions & 0 deletions jsm.go
Expand Up @@ -887,6 +887,20 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
}

var apiSubj string

doDirectGetLastBySubject := o.directGet && mreq.LastFor != ""

if doDirectGetLastBySubject {
apiSubj = apiDirectMsgGetLastBySubjectT
dsSubj := js.apiSubj(fmt.Sprintf(apiSubj, name, mreq.LastFor))
r, err := js.apiRequestWithContext(o.ctx, dsSubj, nil)
if err != nil {
return nil, err
}

return convertDirectGetMsgResponseToMsg(name, r)
}

if o.directGet {
apiSubj = apiDirectMsgGetT
mreq.NextFor = o.directNextFor
Expand Down
13 changes: 12 additions & 1 deletion test/js_test.go
Expand Up @@ -1408,7 +1408,6 @@ func TestJetStreamManagement(t *testing.T) {
expected.MaxDeliver = 1
expected.SampleFrequency = "30"
expected.MaxAckPending = 10
expected.MaxWaiting = 20
expected.HeadersOnly = true
expected.MaxRequestBatch = 10
expected.MaxRequestExpires = 2 * time.Second
Expand Down Expand Up @@ -7409,6 +7408,18 @@ func TestJetStreamDirectGetMsg(t *testing.T) {
if _, err := js.GetMsg("DGM", 0, nats.DirectGet()); err == nil || !strings.Contains(err.Error(), "Empty Request") {
t.Fatalf("Unexpected error: %v", err)
}

// Test direct get by subject by trying to get 'bar' directly
r, err = js.GetLastMsg("DGM", "bar", nats.DirectGet())
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
if r.Subject != "bar" {
t.Fatalf("expected subject to be 'bar', got: %v", r.Subject)
}
if string(r.Data) != "d" {
t.Fatalf("expected data to be 'd', got: %v", string(r.Data))
}
}

func TestJetStreamConsumerReplicasOption(t *testing.T) {
Expand Down