Skip to content

Commit

Permalink
Merge pull request #1030 from nats-io/JMS-DirectGetLastBySubject
Browse files Browse the repository at this point in the history
Support Direct Gets by subject
  • Loading branch information
codegangsta committed Aug 3, 2022
2 parents 59d7d9c + e7a08b8 commit ec49000
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 4 deletions.
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4
github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
4 changes: 2 additions & 2 deletions go_test.sum
Expand Up @@ -20,8 +20,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4 h1:hAhXb/iuQNPZe9y0wFift4hkiquMlNF5WuMhpoMlUUM=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116 h1:NoZ5jkLgMNijnDh96QENq4M06AF34GXlvaYtHGXm/Jk=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg=
github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
3 changes: 3 additions & 0 deletions js.go
Expand Up @@ -169,6 +169,9 @@ const (
// apiMsgGetT is the endpoint to perform a direct get of a message.
apiDirectMsgGetT = "DIRECT.GET.%s"

// apiDirectMsgGetLastBySubjectT is the endpoint to perform a direct get of a message by subject.
apiDirectMsgGetLastBySubjectT = "DIRECT.GET.%s.%s"

// apiMsgDeleteT is the endpoint to remove a message.
apiMsgDeleteT = "STREAM.MSG.DELETE.%s"

Expand Down
14 changes: 14 additions & 0 deletions jsm.go
Expand Up @@ -887,6 +887,20 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
}

var apiSubj string

doDirectGetLastBySubject := o.directGet && mreq.LastFor != ""

if doDirectGetLastBySubject {
apiSubj = apiDirectMsgGetLastBySubjectT
dsSubj := js.apiSubj(fmt.Sprintf(apiSubj, name, mreq.LastFor))
r, err := js.apiRequestWithContext(o.ctx, dsSubj, nil)
if err != nil {
return nil, err
}

return convertDirectGetMsgResponseToMsg(name, r)
}

if o.directGet {
apiSubj = apiDirectMsgGetT
mreq.NextFor = o.directNextFor
Expand Down
13 changes: 12 additions & 1 deletion test/js_test.go
Expand Up @@ -1408,7 +1408,6 @@ func TestJetStreamManagement(t *testing.T) {
expected.MaxDeliver = 1
expected.SampleFrequency = "30"
expected.MaxAckPending = 10
expected.MaxWaiting = 20
expected.HeadersOnly = true
expected.MaxRequestBatch = 10
expected.MaxRequestExpires = 2 * time.Second
Expand Down Expand Up @@ -7409,6 +7408,18 @@ func TestJetStreamDirectGetMsg(t *testing.T) {
if _, err := js.GetMsg("DGM", 0, nats.DirectGet()); err == nil || !strings.Contains(err.Error(), "Empty Request") {
t.Fatalf("Unexpected error: %v", err)
}

// Test direct get by subject by trying to get 'bar' directly
r, err = js.GetLastMsg("DGM", "bar", nats.DirectGet())
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
if r.Subject != "bar" {
t.Fatalf("expected subject to be 'bar', got: %v", r.Subject)
}
if string(r.Data) != "d" {
t.Fatalf("expected data to be 'd', got: %v", string(r.Data))
}
}

func TestJetStreamConsumerReplicasOption(t *testing.T) {
Expand Down

0 comments on commit ec49000

Please sign in to comment.