Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] JetStream: nats.DirectGet() and nats.DirectGetNext() options #1020

Merged
merged 2 commits into from Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions go_test.mod
Expand Up @@ -4,17 +4,17 @@ go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.8.4
github.com/nats-io/nats-server/v2 v2.8.5-0.20220725185857-9446170af436
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)

require (
github.com/klauspost/compress v1.14.4 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
github.com/nats-io/jwt/v2 v2.3.0 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect
)
41 changes: 28 additions & 13 deletions go_test.sum
@@ -1,3 +1,5 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
Expand All @@ -9,35 +11,44 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4=
github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220725185857-9446170af436 h1:1klEMbLkqGBWaS5L/WM1rNL5n/yL4weFaWkgV+jXPVU=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220725185857-9446170af436/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg=
github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 h1:0jf+tOCoZ3LyutmCOWpVni1chK4VfFLhRsDK7MhqGRY=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M=
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220411224347-583f2d630306 h1:+gHMid33q6pen7kv9xvT+JRinntgeXO2AeZVd0AWD3w=
golang.org/x/time v0.0.0-20220411224347-583f2d630306/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand All @@ -48,3 +59,7 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
7 changes: 6 additions & 1 deletion js.go
Expand Up @@ -166,6 +166,9 @@ const (
// apiMsgGetT is the endpoint to get a message.
apiMsgGetT = "STREAM.MSG.GET.%s"

// apiMsgGetT is the endpoint to perform a direct get of a message.
apiDirectMsgGetT = "DIRECT.GET.%s"

// apiMsgDeleteT is the endpoint to remove a message.
apiMsgDeleteT = "STREAM.MSG.DELETE.%s"

Expand Down Expand Up @@ -388,10 +391,12 @@ const (
MsgRollup = "Nats-Rollup"
)

// Headers for republished messages.
// Headers for republished messages and direct gets.
const (
JSStream = "Nats-Stream"
JSSequence = "Nats-Sequence"
JSTimeStamp = "Nats-Time-Stamp"
JSSubject = "Nats-Subject"
JSLastSequence = "Nats-Last-Sequence"
)

Expand Down
62 changes: 62 additions & 0 deletions js_test.go
Expand Up @@ -1026,3 +1026,65 @@ func TestJetStreamClusterPlacement(t *testing.T) {
t.Fatalf("Unexpected tag: %q", v)
}
}

func TestJetStreamConvertDirectMsgResponseToMsg(t *testing.T) {
// This test checks the conversion of a "direct get message" response
// to a JS message based on the content of specific NATS headers.
// It is very specific to the order headers retrieval is made in
// convertDirectGetMsgResponseToMsg(), so it may need adjustment
// if changes are made there.

msg := NewMsg("inbox")

check := func(errTxt string) {
t.Helper()
m, err := convertDirectGetMsgResponseToMsg("test", msg)
if err == nil || !strings.Contains(err.Error(), errTxt) {
t.Fatalf("Expected error contain %q, got %v", errTxt, err)
}
if m != nil {
t.Fatalf("Expected nil message, got %v", m)
}
}

check("should have headers")

msg.Header.Set("some", "header")
check("missing stream")

msg.Header.Set(JSStream, "other")
check("stream header is 'other', not 'test'")

msg.Header.Set(JSStream, "test")
check("missing sequence")

msg.Header.Set(JSSequence, "abc")
check("invalid sequence")

msg.Header.Set(JSSequence, "1")
check("missing timestamp")

msg.Header.Set(JSTimeStamp, "aaaaaaaaa bbbbbbbbbbbb cccccccccc ddddddddddd eeeeeeeeee ffffff")
check("invalid timestamp")

msg.Header.Set(JSTimeStamp, "2006-01-02 15:04:05.999999999 +0000 UTC")
check("missing subject")

msg.Header.Set(JSSubject, "foo")
r, err := convertDirectGetMsgResponseToMsg("test", msg)
if err != nil {
t.Fatalf("Error during convert: %v", err)
}
if r.Subject != "foo" {
t.Fatalf("Expected subject to be 'foo', got %q", r.Subject)
}
if r.Sequence != 1 {
t.Fatalf("Expected sequence to be 1, got %v", r.Sequence)
}
if r.Time.UnixNano() != 0xFC4A4D639917BFF {
t.Fatalf("Invalid timestamp: %v", r.Time.UnixNano())
}
if r.Header.Get("some") != "header" {
t.Fatalf("Wrong header: %v", r.Header)
}
}
106 changes: 105 additions & 1 deletion jsm.go
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -48,6 +49,14 @@ type JetStreamManager interface {
// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to get the stream somewhere for this call? If so can we overload instead of introducing DirectGetMsg?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to get the stream somewhere for this call?

Sorry, I am not sure what you mean here.

We could add the DirectGetMsgRequest as a JSOpt (like we do for stream info and purge), but it still would look weird since in GetMsg() "seq" is a mandatory param, so one would have to pass seq==0 if passing a DirectGetMsgRequest option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to make the call we have we at least need stream name. I was not sure if we had more such that we could remember the state of directAllowed.

I might opt for here to have direct be a subOpt. KV is where it is critical path, so this not as important IMO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to make the call we have we at least need stream name. I was not sure if we had more such that we could remember the state of directAllowed.

I think I understand, but GetMsg() is not looking up the stream, so there is no state to remember if AllowDirect is set or not. The comment in server code was that if one does a "direct get" to a stream that is not set for allow direct, it would get a timeout or no responder.

I might opt for here to have direct be a subOpt.

But then it means something like: GetMsg("myStream", 0, &nats.DirectGetMsgRequest{NextFor: "foo"}). I could remove "Seq" from the option request and say that if it is provided in the GetMsg() call (2nd param) then use this one to construct the request? (would have to have a non exported request struct WITH the sequence for the marshal'ing though). That looks a bit ugly.

KV is where it is critical path, so this not as important IMO.

Well, as you can see, in KV I do call the new API, so if it is GetMsg() with option, I would still need to pass the option to that call, so not sure if that makes any difference. Meaning KV is using the API...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add new opt, nats.DirectGet, or nats.AllowDirect or something, to make it simpler.

Would you push down that path or just keep GetMsgDirect?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The management path is not hot path, so I would say let's try to make an option. Under covers directGet() should exist and KV can use that directly..

Copy link
Member

@aricart aricart Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the JavaScript client this is implemented like this:

https://github.com/nats-io/nats.deno/blob/main/nats-base-client/jsmdirect_api.ts

The DirectMsgRequest type is really an union of the possible JSON option combinations - the compiler will reject if the shape of the JSON doesn't line up with one of the types:

export type DirectMsgRequest =
  | SeqMsgRequest
  | LastForMsgRequest
  | NextMsgRequest;

export type NextMsgRequest = {
  seq: number;
  next_by_subj: string;
};

export interface SeqMsgRequest {
  seq: number;
}

export interface LastForMsgRequest {
  "last_by_subj": string;
}

The permutation of the options complicates the API for other languages (like Go), I think instead should have the 3 function that takes the required arguments for the particular combination. That assigns a name for the query they are doing, and helps them out.

Copy link
Member

@wallyqs wallyqs Jul 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that to avoid adding another API, we could make a new type that implements configureJSContext so that it can be used as a JSOpt, then that way it would be possible to call like this for the direct version:

js.GetMsg("foo", 1, &nats.GetMsgRequest{Direct: true, NextFor: "bar"}) // Direct: false if not direct needed

Copy link
Member Author

@kozlovic kozlovic Jul 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then for a direct and LastFor, where seq is not supposed to be present, it will be:

js.GetMsg("foo", 0, &nats.GetMsgRequest{Direct: true, LastFor: "bar"})

Notice that user will have to pass 0 for the sequence. If that is ok with everybody, we can do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe for LastFor we suggest using GetLastMsg instead and only allow &nats.GetMsgRequest{Direct: true} option?


// GetLastMsg retrieves the last raw stream message stored in JetStream by subject.
GetLastMsg(name, subject string, opts ...JSOpt) (*RawStreamMsg, error)

// DirectGetMsg retrieves directly a raw stream message stored in JetStream from a
// distributed group of servers. The stream must have been created/updated with the
// AllowDirect boolean.
DirectGetMsg(name string, dgo *DirectGetMsgRequest, opts ...JSOpt) (*RawStreamMsg, error)

// DeleteMsg erases a message from a stream.
DeleteMsg(name string, seq uint64, opts ...JSOpt) error

Expand Down Expand Up @@ -100,10 +109,14 @@ type StreamConfig struct {
DenyDelete bool `json:"deny_delete,omitempty"`
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
AllowDirect bool `json:"allow_direct,omitempty"`

// Allow republish of the message after being sequenced and stored.
RePublish *RePublish `json:"republish,omitempty"`

// Allow higher performance, direct access to get individual messages. E.g. KeyValue
AllowDirect bool `json:"allow_direct,omitempty"`
// Allow higher performance and unified direct access for mirrors as well.
MirrorDirect bool `json:"mirror_direct,omitempty"`
}

// RePublish is for republishing messages once committed to a stream. The original
Expand Down Expand Up @@ -903,6 +916,97 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
}, nil
}

type DirectGetMsgRequest struct {
Seq uint64 `json:"seq,omitempty"`
LastFor string `json:"last_by_subj,omitempty"`
NextFor string `json:"next_by_subj,omitempty"`
}

func (js *js) DirectGetMsg(name string, dgo *DirectGetMsgRequest, opts ...JSOpt) (*RawStreamMsg, error) {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
}
if cancel != nil {
defer cancel()
}
if err := checkStreamName(name); err != nil {
return nil, err
}
if dgo == nil {
return nil, fmt.Errorf("nats: direct get message request is required")
}
req, err := json.Marshal(dgo)
if err != nil {
return nil, err
}
dsSubj := js.apiSubj(fmt.Sprintf(apiDirectMsgGetT, name))
r, err := js.apiRequestWithContext(o.ctx, dsSubj, req)
if err != nil {
return nil, err
}

// Check for 404/408. We would get a no-payload message and a "Status" header
if len(r.Data) == 0 {
val := r.Header.Get(statusHdr)
if val != _EMPTY_ {
switch val {
case noMessagesSts:
return nil, ErrMsgNotFound
default:
desc := r.Header.Get(descrHdr)
if desc == _EMPTY_ {
desc = "unable to get message"
}
return nil, fmt.Errorf("nats: %s", desc)
}
}
}
return convertDirectGetMsgResponseToMsg(name, r)
}

func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error) {
// Check for headers that give us the required information to
// reconstruct the message.
if len(r.Header) == 0 {
return nil, fmt.Errorf("nats: response should have headers")
}
stream := r.Header.Get(JSStream)
if stream == _EMPTY_ {
return nil, fmt.Errorf("nats: missing stream header")
}
if stream != name {
return nil, fmt.Errorf("nats: response stream header is '%s', not '%s'", stream, name)
}
seqStr := r.Header.Get(JSSequence)
if seqStr == _EMPTY_ {
return nil, fmt.Errorf("nats: missing sequence header")
}
seq, err := strconv.ParseUint(seqStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("nats: invalid sequence header '%s': %v", seqStr, err)
}
timeStr := r.Header.Get(JSTimeStamp)
if timeStr == _EMPTY_ {
return nil, fmt.Errorf("nats: missing timestamp header")
}
tm, err := time.Parse("2006-01-02 15:04:05.999999999 +0000 UTC", timeStr)
if err != nil {
return nil, fmt.Errorf("nats: invalid timestamp header '%s': %v", timeStr, err)
}
subj := r.Header.Get(JSSubject)
if subj == _EMPTY_ {
return nil, fmt.Errorf("nats: missing subject header")
}
return &RawStreamMsg{
Subject: subj,
Sequence: seq,
Header: r.Header,
Data: r.Data,
Time: tm,
}, nil
}

type msgDeleteRequest struct {
Seq uint64 `json:"seq"`
}
Expand Down