diff --git a/go_test.mod b/go_test.mod index 9de5e151e..a879c15a2 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,7 +4,7 @@ go 1.17 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4 + github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go_test.sum b/go_test.sum index 8fa507d3b..ece6fa6e1 100644 --- a/go_test.sum +++ b/go_test.sum @@ -20,8 +20,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4 h1:hAhXb/iuQNPZe9y0wFift4hkiquMlNF5WuMhpoMlUUM= -github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg= +github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116 h1:NoZ5jkLgMNijnDh96QENq4M06AF34GXlvaYtHGXm/Jk= +github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg= github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= diff --git a/js.go b/js.go index fc2fd480c..172779edb 100644 --- a/js.go +++ b/js.go @@ -169,6 +169,9 @@ const ( // apiMsgGetT is the endpoint to perform a direct get of a message. apiDirectMsgGetT = "DIRECT.GET.%s" + // apiDirectMsgGetLastBySubjectT is the endpoint to perform a direct get of a message by subject. + apiDirectMsgGetLastBySubjectT = "DIRECT.GET.%s.%s" + // apiMsgDeleteT is the endpoint to remove a message. apiMsgDeleteT = "STREAM.MSG.DELETE.%s" diff --git a/jsm.go b/jsm.go index 7018d0e52..faeacabb7 100644 --- a/jsm.go +++ b/jsm.go @@ -887,6 +887,20 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt } var apiSubj string + + doDirectGetLastBySubject := o.directGet && mreq.LastFor != "" + + if doDirectGetLastBySubject { + apiSubj = apiDirectMsgGetLastBySubjectT + dsSubj := js.apiSubj(fmt.Sprintf(apiSubj, name, mreq.LastFor)) + r, err := js.apiRequestWithContext(o.ctx, dsSubj, nil) + if err != nil { + return nil, err + } + + return convertDirectGetMsgResponseToMsg(name, r) + } + if o.directGet { apiSubj = apiDirectMsgGetT mreq.NextFor = o.directNextFor diff --git a/test/js_test.go b/test/js_test.go index 26415e118..e5c229ae5 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1408,7 +1408,6 @@ func TestJetStreamManagement(t *testing.T) { expected.MaxDeliver = 1 expected.SampleFrequency = "30" expected.MaxAckPending = 10 - expected.MaxWaiting = 20 expected.HeadersOnly = true expected.MaxRequestBatch = 10 expected.MaxRequestExpires = 2 * time.Second @@ -7409,6 +7408,18 @@ func TestJetStreamDirectGetMsg(t *testing.T) { if _, err := js.GetMsg("DGM", 0, nats.DirectGet()); err == nil || !strings.Contains(err.Error(), "Empty Request") { t.Fatalf("Unexpected error: %v", err) } + + // Test direct get by subject by trying to get 'bar' directly + r, err = js.GetLastMsg("DGM", "bar", nats.DirectGet()) + if err != nil { + t.Fatalf("Error getting message: %v", err) + } + if r.Subject != "bar" { + t.Fatalf("expected subject to be 'bar', got: %v", r.Subject) + } + if string(r.Data) != "d" { + t.Fatalf("expected data to be 'd', got: %v", string(r.Data)) + } } func TestJetStreamConsumerReplicasOption(t *testing.T) {