From 1eb8a22c214730f761fbd60f50a16979e153e74c Mon Sep 17 00:00:00 2001 From: Jeremy Saenz Date: Wed, 3 Aug 2022 09:50:45 -0700 Subject: [PATCH 1/6] Started implementation of direct gets by subject --- js.go | 3 +++ jsm.go | 14 ++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/js.go b/js.go index fc2fd480c..172779edb 100644 --- a/js.go +++ b/js.go @@ -169,6 +169,9 @@ const ( // apiMsgGetT is the endpoint to perform a direct get of a message. apiDirectMsgGetT = "DIRECT.GET.%s" + // apiDirectMsgGetLastBySubjectT is the endpoint to perform a direct get of a message by subject. + apiDirectMsgGetLastBySubjectT = "DIRECT.GET.%s.%s" + // apiMsgDeleteT is the endpoint to remove a message. apiMsgDeleteT = "STREAM.MSG.DELETE.%s" diff --git a/jsm.go b/jsm.go index 7018d0e52..1b36eab87 100644 --- a/jsm.go +++ b/jsm.go @@ -887,6 +887,20 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt } var apiSubj string + + doDirectGetLastBySubject := o.directGet && mreq.LastFor != "" + + if doDirectGetLastBySubject && js.nc.serverMinVersion(2, 9, 0) { + apiSubj = apiDirectMsgGetLastBySubjectT + dsSubj := js.apiSubj(fmt.Sprintf(apiSubj, name, mreq.LastFor)) + r, err := js.apiRequestWithContext(o.ctx, dsSubj, nil) + if err != nil { + return nil, err + } + + return convertDirectGetMsgResponseToMsg(name, r) + } + if o.directGet { apiSubj = apiDirectMsgGetT mreq.NextFor = o.directNextFor From 15f04809112b4290ebc0045ed725d93bf41fa15c Mon Sep 17 00:00:00 2001 From: Jeremy Saenz Date: Wed, 3 Aug 2022 10:04:15 -0700 Subject: [PATCH 2/6] Remove server version check --- jsm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jsm.go b/jsm.go index 1b36eab87..faeacabb7 100644 --- a/jsm.go +++ b/jsm.go @@ -890,7 +890,7 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt doDirectGetLastBySubject := o.directGet && mreq.LastFor != "" - if doDirectGetLastBySubject && js.nc.serverMinVersion(2, 9, 0) { + if doDirectGetLastBySubject { apiSubj = apiDirectMsgGetLastBySubjectT dsSubj := js.apiSubj(fmt.Sprintf(apiSubj, name, mreq.LastFor)) r, err := js.apiRequestWithContext(o.ctx, dsSubj, nil) From 139a1175d903717fa64f12ad3c78f14876edf642 Mon Sep 17 00:00:00 2001 From: Jeremy Saenz Date: Wed, 3 Aug 2022 10:31:11 -0700 Subject: [PATCH 3/6] Upgraded server dependency, added test --- go_test.mod | 3 ++- go_test.sum | 5 +++++ test/js_test.go | 13 +++++++++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/go_test.mod b/go_test.mod index 9de5e151e..a4fba6970 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,7 +4,7 @@ go 1.17 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4 + github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 @@ -14,6 +14,7 @@ require ( github.com/klauspost/compress v1.15.9 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/nats-io/jwt/v2 v2.3.0 // indirect + go.uber.org/automaxprocs v1.5.1 // indirect golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect diff --git a/go_test.sum b/go_test.sum index 8fa507d3b..0f77e2cfa 100644 --- a/go_test.sum +++ b/go_test.sum @@ -20,8 +20,12 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4= +github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4 h1:hAhXb/iuQNPZe9y0wFift4hkiquMlNF5WuMhpoMlUUM= github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg= +github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116 h1:NoZ5jkLgMNijnDh96QENq4M06AF34GXlvaYtHGXm/Jk= +github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg= github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= @@ -32,6 +36,7 @@ github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk= go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= diff --git a/test/js_test.go b/test/js_test.go index 26415e118..576f80da1 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -7409,6 +7409,19 @@ func TestJetStreamDirectGetMsg(t *testing.T) { if _, err := js.GetMsg("DGM", 0, nats.DirectGet()); err == nil || !strings.Contains(err.Error(), "Empty Request") { t.Fatalf("Unexpected error: %v", err) } + + // Test direct get by subject by trying to get 'bar' directly + r, err = js.GetLastMsg("DGM", "bar", nats.DirectGet()) + if err != nil { + t.Fatalf("Error getting message: %v", err) + } + if r.Subject != "bar" { + t.Fatalf("expected subject to be 'bar', got: %v", r.Subject) + } + if string(r.Data) != "d" { + t.Fatalf("Error getting message: %v", err) + t.Fatalf("expected data to be 'd', got: %v", string(r.Data)) + } } func TestJetStreamConsumerReplicasOption(t *testing.T) { From 639de56eedb56b364e1eee7d38e73d3b44a084aa Mon Sep 17 00:00:00 2001 From: Jeremy Saenz Date: Wed, 3 Aug 2022 10:42:57 -0700 Subject: [PATCH 4/6] go mod tidy --- go_test.mod | 1 - go_test.sum | 5 ----- 2 files changed, 6 deletions(-) diff --git a/go_test.mod b/go_test.mod index a4fba6970..a879c15a2 100644 --- a/go_test.mod +++ b/go_test.mod @@ -14,7 +14,6 @@ require ( github.com/klauspost/compress v1.15.9 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/nats-io/jwt/v2 v2.3.0 // indirect - go.uber.org/automaxprocs v1.5.1 // indirect golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect diff --git a/go_test.sum b/go_test.sum index 0f77e2cfa..ece6fa6e1 100644 --- a/go_test.sum +++ b/go_test.sum @@ -20,10 +20,6 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4= -github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= -github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4 h1:hAhXb/iuQNPZe9y0wFift4hkiquMlNF5WuMhpoMlUUM= -github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg= github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116 h1:NoZ5jkLgMNijnDh96QENq4M06AF34GXlvaYtHGXm/Jk= github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg= github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= @@ -36,7 +32,6 @@ github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk= go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= From c7c26988a6e899433d82109b50e25fe6fa438962 Mon Sep 17 00:00:00 2001 From: Jeremy Saenz Date: Wed, 3 Aug 2022 10:43:22 -0700 Subject: [PATCH 5/6] remove bad t.Fatalf line --- test/js_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/js_test.go b/test/js_test.go index 576f80da1..e29ab4d29 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -7419,7 +7419,6 @@ func TestJetStreamDirectGetMsg(t *testing.T) { t.Fatalf("expected subject to be 'bar', got: %v", r.Subject) } if string(r.Data) != "d" { - t.Fatalf("Error getting message: %v", err) t.Fatalf("expected data to be 'd', got: %v", string(r.Data)) } } From e7a08b8f375888d595bf3e52eaa1761ccbe1c503 Mon Sep 17 00:00:00 2001 From: Jeremy Saenz Date: Wed, 3 Aug 2022 10:47:45 -0700 Subject: [PATCH 6/6] Remove update to MaxWaiting as the server does not support it --- test/js_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/js_test.go b/test/js_test.go index e29ab4d29..e5c229ae5 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1408,7 +1408,6 @@ func TestJetStreamManagement(t *testing.T) { expected.MaxDeliver = 1 expected.SampleFrequency = "30" expected.MaxAckPending = 10 - expected.MaxWaiting = 20 expected.HeadersOnly = true expected.MaxRequestBatch = 10 expected.MaxRequestExpires = 2 * time.Second