Skip to content

Commit

Permalink
Merge pull request #1020 from nats-io/js_direct_get
Browse files Browse the repository at this point in the history
[ADDED] JetStream: nats.DirectGet() and nats.DirectGetNext() options
  • Loading branch information
kozlovic committed Jul 29, 2022
2 parents f4a86f3 + b22f6cb commit daee313
Show file tree
Hide file tree
Showing 8 changed files with 395 additions and 32 deletions.
12 changes: 6 additions & 6 deletions go_test.mod
Expand Up @@ -4,17 +4,17 @@ go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.8.4
github.com/nats-io/nats-server/v2 v2.8.5-0.20220725185857-9446170af436
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)

require (
github.com/klauspost/compress v1.14.4 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
github.com/nats-io/jwt/v2 v2.3.0 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect
)
41 changes: 28 additions & 13 deletions go_test.sum
@@ -1,3 +1,5 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
Expand All @@ -9,35 +11,44 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4=
github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220725185857-9446170af436 h1:1klEMbLkqGBWaS5L/WM1rNL5n/yL4weFaWkgV+jXPVU=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220725185857-9446170af436/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg=
github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 h1:0jf+tOCoZ3LyutmCOWpVni1chK4VfFLhRsDK7MhqGRY=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M=
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220411224347-583f2d630306 h1:+gHMid33q6pen7kv9xvT+JRinntgeXO2AeZVd0AWD3w=
golang.org/x/time v0.0.0-20220411224347-583f2d630306/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand All @@ -48,3 +59,7 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
35 changes: 34 additions & 1 deletion js.go
Expand Up @@ -166,6 +166,9 @@ const (
// apiMsgGetT is the endpoint to get a message.
apiMsgGetT = "STREAM.MSG.GET.%s"

// apiMsgGetT is the endpoint to perform a direct get of a message.
apiDirectMsgGetT = "DIRECT.GET.%s"

// apiMsgDeleteT is the endpoint to remove a message.
apiMsgDeleteT = "STREAM.MSG.DELETE.%s"

Expand Down Expand Up @@ -233,6 +236,10 @@ type jsOpts struct {
purgeOpts *StreamPurgeRequest
// streamInfoOpts contains optional stream info options
streamInfoOpts *StreamInfoRequest
// For direct get message requests
directGet bool
// For direct get next message
directNextFor string
}

const (
Expand Down Expand Up @@ -325,6 +332,30 @@ func APIPrefix(pre string) JSOpt {
})
}

// DirectGet is an option that can be used to make GetMsg() or GetLastMsg()
// retrieve message directly from a group of servers (leader and replicas)
// if the stream was created with the AllowDirect option.
func DirectGet() JSOpt {
return jsOptFn(func(js *jsOpts) error {
js.directGet = true
return nil
})
}

// DirectGetNext is an option that can be used to make GetMsg() retrieve message
// directly from a group of servers (leader and replicas) if the stream was
// created with the AllowDirect option.
// The server will find the next message matching the filter `subject` starting
// at the start sequence (argument in GetMsg()). The filter `subject` can be a
// wildcard.
func DirectGetNext(subject string) JSOpt {
return jsOptFn(func(js *jsOpts) error {
js.directGet = true
js.directNextFor = subject
return nil
})
}

func (js *js) apiSubj(subj string) string {
if js.opts.pre == _EMPTY_ {
return subj
Expand Down Expand Up @@ -388,10 +419,12 @@ const (
MsgRollup = "Nats-Rollup"
)

// Headers for republished messages.
// Headers for republished messages and direct gets.
const (
JSStream = "Nats-Stream"
JSSequence = "Nats-Sequence"
JSTimeStamp = "Nats-Time-Stamp"
JSSubject = "Nats-Subject"
JSLastSequence = "Nats-Last-Sequence"
)

Expand Down
73 changes: 73 additions & 0 deletions js_test.go
Expand Up @@ -1026,3 +1026,76 @@ func TestJetStreamClusterPlacement(t *testing.T) {
t.Fatalf("Unexpected tag: %q", v)
}
}

func TestJetStreamConvertDirectMsgResponseToMsg(t *testing.T) {
// This test checks the conversion of a "direct get message" response
// to a JS message based on the content of specific NATS headers.
// It is very specific to the order headers retrieval is made in
// convertDirectGetMsgResponseToMsg(), so it may need adjustment
// if changes are made there.

msg := NewMsg("inbox")

check := func(errTxt string) {
t.Helper()
m, err := convertDirectGetMsgResponseToMsg("test", msg)
if err == nil || !strings.Contains(err.Error(), errTxt) {
t.Fatalf("Expected error contain %q, got %v", errTxt, err)
}
if m != nil {
t.Fatalf("Expected nil message, got %v", m)
}
}

check("should have headers")

msg.Header.Set(statusHdr, noMessagesSts)
check(ErrMsgNotFound.Error())

msg.Header.Set(statusHdr, reqTimeoutSts)
check("unable to get message")

msg.Header.Set(descrHdr, "some error text")
check("some error text")

msg.Header.Del(statusHdr)
msg.Header.Del(descrHdr)
msg.Header.Set("some", "header")
check("missing stream")

msg.Header.Set(JSStream, "other")
check("stream header is 'other', not 'test'")

msg.Header.Set(JSStream, "test")
check("missing sequence")

msg.Header.Set(JSSequence, "abc")
check("invalid sequence")

msg.Header.Set(JSSequence, "1")
check("missing timestamp")

msg.Header.Set(JSTimeStamp, "aaaaaaaaa bbbbbbbbbbbb cccccccccc ddddddddddd eeeeeeeeee ffffff")
check("invalid timestamp")

msg.Header.Set(JSTimeStamp, "2006-01-02 15:04:05.999999999 +0000 UTC")
check("missing subject")

msg.Header.Set(JSSubject, "foo")
r, err := convertDirectGetMsgResponseToMsg("test", msg)
if err != nil {
t.Fatalf("Error during convert: %v", err)
}
if r.Subject != "foo" {
t.Fatalf("Expected subject to be 'foo', got %q", r.Subject)
}
if r.Sequence != 1 {
t.Fatalf("Expected sequence to be 1, got %v", r.Sequence)
}
if r.Time.UnixNano() != 0xFC4A4D639917BFF {
t.Fatalf("Invalid timestamp: %v", r.Time.UnixNano())
}
if r.Header.Get("some") != "header" {
t.Fatalf("Wrong header: %v", r.Header)
}
}
93 changes: 89 additions & 4 deletions jsm.go
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -46,8 +47,17 @@ type JetStreamManager interface {
StreamNames(opts ...JSOpt) <-chan string

// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
// Use options nats.DirectGet() or nats.DirectGetNext() to trigger retrieval
// directly from a distributed group of servers (leader and replicas).
// The stream must have been created/updated with the AllowDirect boolean.
GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error)

// GetLastMsg retrieves the last raw stream message stored in JetStream by subject.
// Use option nats.DirectGet() to trigger retrieval
// directly from a distributed group of servers (leader and replicas).
// The stream must have been created/updated with the AllowDirect boolean.
GetLastMsg(name, subject string, opts ...JSOpt) (*RawStreamMsg, error)

// DeleteMsg erases a message from a stream.
DeleteMsg(name string, seq uint64, opts ...JSOpt) error

Expand Down Expand Up @@ -100,10 +110,14 @@ type StreamConfig struct {
DenyDelete bool `json:"deny_delete,omitempty"`
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
AllowDirect bool `json:"allow_direct,omitempty"`

// Allow republish of the message after being sequenced and stored.
RePublish *RePublish `json:"republish,omitempty"`

// Allow higher performance, direct access to get individual messages. E.g. KeyValue
AllowDirect bool `json:"allow_direct,omitempty"`
// Allow higher performance and unified direct access for mirrors as well.
MirrorDirect bool `json:"mirror_direct,omitempty"`
}

// RePublish is for republishing messages once committed to a stream. The original
Expand Down Expand Up @@ -812,6 +826,7 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error {
type apiMsgGetRequest struct {
Seq uint64 `json:"seq,omitempty"`
LastFor string `json:"last_by_subj,omitempty"`
NextFor string `json:"next_by_subj,omitempty"`
}

// RawStreamMsg is a raw message stored in JetStream.
Expand Down Expand Up @@ -858,21 +873,33 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
defer cancel()
}

if name == _EMPTY_ {
return nil, ErrStreamNameRequired
if err := checkStreamName(name); err != nil {
return nil, err
}

var apiSubj string
if o.directGet {
apiSubj = apiDirectMsgGetT
mreq.NextFor = o.directNextFor
} else {
apiSubj = apiMsgGetT
}

req, err := json.Marshal(mreq)
if err != nil {
return nil, err
}

dsSubj := js.apiSubj(fmt.Sprintf(apiMsgGetT, name))
dsSubj := js.apiSubj(fmt.Sprintf(apiSubj, name))
r, err := js.apiRequestWithContext(o.ctx, dsSubj, req)
if err != nil {
return nil, err
}

if o.directGet {
return convertDirectGetMsgResponseToMsg(name, r)
}

var resp apiMsgGetResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
Expand Down Expand Up @@ -903,6 +930,64 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
}, nil
}

func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error) {
// Check for 404/408. We would get a no-payload message and a "Status" header
if len(r.Data) == 0 {
val := r.Header.Get(statusHdr)
if val != _EMPTY_ {
switch val {
case noMessagesSts:
return nil, ErrMsgNotFound
default:
desc := r.Header.Get(descrHdr)
if desc == _EMPTY_ {
desc = "unable to get message"
}
return nil, fmt.Errorf("nats: %s", desc)
}
}
}
// Check for headers that give us the required information to
// reconstruct the message.
if len(r.Header) == 0 {
return nil, fmt.Errorf("nats: response should have headers")
}
stream := r.Header.Get(JSStream)
if stream == _EMPTY_ {
return nil, fmt.Errorf("nats: missing stream header")
}
if stream != name {
return nil, fmt.Errorf("nats: response stream header is '%s', not '%s'", stream, name)
}
seqStr := r.Header.Get(JSSequence)
if seqStr == _EMPTY_ {
return nil, fmt.Errorf("nats: missing sequence header")
}
seq, err := strconv.ParseUint(seqStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("nats: invalid sequence header '%s': %v", seqStr, err)
}
timeStr := r.Header.Get(JSTimeStamp)
if timeStr == _EMPTY_ {
return nil, fmt.Errorf("nats: missing timestamp header")
}
tm, err := time.Parse("2006-01-02 15:04:05.999999999 +0000 UTC", timeStr)
if err != nil {
return nil, fmt.Errorf("nats: invalid timestamp header '%s': %v", timeStr, err)
}
subj := r.Header.Get(JSSubject)
if subj == _EMPTY_ {
return nil, fmt.Errorf("nats: missing subject header")
}
return &RawStreamMsg{
Subject: subj,
Sequence: seq,
Header: r.Header,
Data: r.Data,
Time: tm,
}, nil
}

type msgDeleteRequest struct {
Seq uint64 `json:"seq"`
}
Expand Down

0 comments on commit daee313

Please sign in to comment.