diff --git a/go_test.mod b/go_test.mod index 4d301c8a5..a868bac29 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6 + github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go_test.sum b/go_test.sum index 05586f46f..5f7cda601 100644 --- a/go_test.sum +++ b/go_test.sum @@ -15,33 +15,24 @@ github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEE github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= -github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= -github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= -github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= -github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6 h1:TYI6K487xhbbpKjz4gIIVBWL6l2gFI3JHu/N0XySwRY= -github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6/go.mod h1:xZLDZ6cRUu9FCh7+mKXGEy16O66CdWVxttxNIiUuNCk= -github.com/nats-io/nats.go v1.12.2-0.20210916222008-92921544b891/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= -github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= +github.com/nats-io/jwt/v2 v2.1.0 h1:1UbfD5g1xTdWmSeRV8bh/7u+utTiBsRtWhLl1PixZp4= +github.com/nats-io/jwt/v2 v2.1.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da h1:0snsE4pD2VKIsFiRMRkHFY+SJZVbT7/eZJ1lOt5XuLA= +github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da/go.mod h1:ubcDOPViqaQcNvJVzoX9FIDxAxyJDTItw07lqFCzC80= +github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/js.go b/js.go index d3f5dc9d0..361c15a6c 100644 --- a/js.go +++ b/js.go @@ -30,6 +30,85 @@ import ( "github.com/nats-io/nuid" ) +// JetStream allows persistent messaging through JetStream. +type JetStream interface { + // Publish publishes a message to JetStream. + Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) + + // PublishMsg publishes a Msg to JetStream. + PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) + + // PublishAsync publishes a message to JetStream and returns a PubAckFuture. + // The data should not be changed until the PubAckFuture has been processed. + PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) + + // PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture. + // The message should not be changed until the PubAckFuture has been processed. + PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) + + // PublishAsyncPending returns the number of async publishes outstanding for this context. + PublishAsyncPending() int + + // PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd. + PublishAsyncComplete() <-chan struct{} + + // Subscribe creates an async Subscription for JetStream. + // The stream and consumer names can be provided with the nats.Bind() option. + // For creating an ephemeral (where the consumer name is picked by the server), + // you can provide the stream name with nats.BindStream(). + // If no stream name is specified, the library will attempt to figure out which + // stream the subscription is for. See important notes below for more details. + // + // IMPORTANT NOTES: + // * If none of the options Bind() nor Durable() are specified, the library will + // send a request to the server to create an ephemeral JetStream consumer, + // which will be deleted after an Unsubscribe() or Drain(), or automatically + // by the server after a short period of time after the NATS subscription is + // gone. + // * If Durable() option is specified, the library will attempt to lookup a JetStream + // consumer with this name, and if found, will bind to it and not attempt to + // delete it. However, if not found, the library will send a request to create + // such durable JetStream consumer. The library will delete the JetStream consumer + // after an Unsubscribe() or Drain(). + // * If Bind() option is provided, the library will attempt to lookup the + // consumer with the given name, and if successful, bind to it. If the lookup fails, + // then the Subscribe() call will return an error. + Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) + + // SubscribeSync creates a Subscription that can be used to process messages synchronously. + // See important note in Subscribe() + SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) + + // ChanSubscribe creates channel based Subscription. + // See important note in Subscribe() + ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) + + // ChanQueueSubscribe creates channel based Subscription with a queue group. + // See important note in QueueSubscribe() + ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) + + // QueueSubscribe creates a Subscription with a queue group. + // If no optional durable name nor binding options are specified, the queue name will be used as a durable name. + // See important note in Subscribe() + QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) + + // QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously. + // See important note in QueueSubscribe() + QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) + + // PullSubscribe creates a Subscription that can fetch messages. + // See important note in Subscribe() + PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) +} + +// JetStreamContext allows JetStream messaging and stream management. +type JetStreamContext interface { + JetStream + JetStreamManager + KeyValueManager + ObjectStoreManager +} + // Request API subjects for JetStream. const ( // defaultAPIPrefix is the default prefix for the JetStream API. @@ -110,84 +189,6 @@ const ( jsCtrlFC = 2 ) -// JetStream allows persistent messaging through JetStream. -type JetStream interface { - // Publish publishes a message to JetStream. - Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) - - // PublishMsg publishes a Msg to JetStream. - PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) - - // PublishAsync publishes a message to JetStream and returns a PubAckFuture. - // The data should not be changed until the PubAckFuture has been processed. - PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) - - // PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture. - // The message should not be changed until the PubAckFuture has been processed. - PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) - - // PublishAsyncPending returns the number of async publishes outstanding for this context. - PublishAsyncPending() int - - // PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd. - PublishAsyncComplete() <-chan struct{} - - // Subscribe creates an async Subscription for JetStream. - // The stream and consumer names can be provided with the nats.Bind() option. - // For creating an ephemeral (where the consumer name is picked by the server), - // you can provide the stream name with nats.BindStream(). - // If no stream name is specified, the library will attempt to figure out which - // stream the subscription is for. See important notes below for more details. - // - // IMPORTANT NOTES: - // * If none of the options Bind() nor Durable() are specified, the library will - // send a request to the server to create an ephemeral JetStream consumer, - // which will be deleted after an Unsubscribe() or Drain(), or automatically - // by the server after a short period of time after the NATS subscription is - // gone. - // * If Durable() option is specified, the library will attempt to lookup a JetStream - // consumer with this name, and if found, will bind to it and not attempt to - // delete it. However, if not found, the library will send a request to create - // such durable JetStream consumer. The library will delete the JetStream consumer - // after an Unsubscribe() or Drain(). - // * If Bind() option is provided, the library will attempt to lookup the - // consumer with the given name, and if successful, bind to it. If the lookup fails, - // then the Subscribe() call will return an error. - Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) - - // SubscribeSync creates a Subscription that can be used to process messages synchronously. - // See important note in Subscribe() - SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) - - // ChanSubscribe creates channel based Subscription. - // See important note in Subscribe() - ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) - - // ChanQueueSubscribe creates channel based Subscription with a queue group. - // See important note in QueueSubscribe() - ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) - - // QueueSubscribe creates a Subscription with a queue group. - // If no optional durable name nor binding options are specified, the queue name will be used as a durable name. - // In that case, the queue name cannot contain dots ".", same restriction that is applied to a durable name. - // See important note in Subscribe() - QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) - - // QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously. - // See important note in QueueSubscribe() - QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) - - // PullSubscribe creates a Subscription that can fetch messages. - // See important note in Subscribe() - PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) -} - -// JetStreamContext allows JetStream messaging and stream management. -type JetStreamContext interface { - JetStream - JetStreamManager -} - // js is an internal struct from a JetStreamContext. type js struct { nc *Conn @@ -320,6 +321,16 @@ const ( ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence" ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence" ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id" + MsgRollup = "Nats-Rollup" +) + +// MsgSize is a header that will be part of a consumer's delivered message if HeadersOnly requested. +const MsgSize = "Nats-Msg-Size" + +// Rollups, can be subject only or all messages. +const ( + MsgRollupSubject = "sub" + MsgRollupAll = "all" ) // PublishMsg publishes a Msg to a stream from JetStream. @@ -667,10 +678,14 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { if m.Reply != _EMPTY_ { return nil, errors.New("nats: reply subject should be empty") } + reply := m.Reply m.Reply = js.newAsyncReply() + defer func() { m.Reply = reply }() + if m.Reply == _EMPTY_ { return nil, errors.New("nats: error creating async reply handler") } + id := m.Reply[aReplyPreLen:] paf := &pubAckFuture{msg: m, st: time.Now()} numPending, maxPending := js.registerPAF(id, paf) @@ -683,7 +698,6 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { return nil, errors.New("nats: stalled with too many outstanding async published messages") } } - if err := js.nc.PublishMsg(m); err != nil { js.clearPAF(id) return nil, err @@ -841,6 +855,7 @@ type ConsumerConfig struct { MaxAckPending int `json:"max_ack_pending,omitempty"` FlowControl bool `json:"flow_control,omitempty"` Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` + HeadersOnly bool `json:"headers_only,omitempty"` } // ConsumerInfo is the info from a JetStream consumer. @@ -2120,6 +2135,14 @@ func DeliverSubject(subject string) SubOpt { }) } +// HeadersOnly() will instruct the consumer to only deleiver headers and no payloads. +func HeadersOnly() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.HeadersOnly = true + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. diff --git a/jsm.go b/jsm.go index a23a4941a..95f38b593 100644 --- a/jsm.go +++ b/jsm.go @@ -93,6 +93,10 @@ type StreamConfig struct { Placement *Placement `json:"placement,omitempty"` Mirror *StreamSource `json:"mirror,omitempty"` Sources []*StreamSource `json:"sources,omitempty"` + Sealed bool `json:"sealed,omitempty"` + DenyDelete bool `json:"deny_delete,omitempty"` + DenyPurge bool `json:"deny_purge,omitempty"` + AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` } // Placement is used to guide placement of streams in clustered JetStream. @@ -726,7 +730,8 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error { } type apiMsgGetRequest struct { - Seq uint64 `json:"seq"` + Seq uint64 `json:"seq,omitempty"` + LastFor string `json:"last_by_subj,omitempty"` } // RawStreamMsg is a raw message stored in JetStream. @@ -751,11 +756,20 @@ type storedMsg struct { type apiMsgGetResponse struct { apiResponse Message *storedMsg `json:"message,omitempty"` - Success bool `json:"success,omitempty"` +} + +// GetLastMsg retrieves the last raw stream message stored in JetStream by subject. +func (js *js) GetLastMsg(name, subject string, opts ...JSOpt) (*RawStreamMsg, error) { + return js.getMsg(name, &apiMsgGetRequest{LastFor: subject}, opts...) } // GetMsg retrieves a raw stream message stored in JetStream by sequence number. func (js *js) GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) { + return js.getMsg(name, &apiMsgGetRequest{Seq: seq}, opts...) +} + +// Low level getMsg +func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawStreamMsg, error) { o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { return nil, err @@ -768,7 +782,7 @@ func (js *js) GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, err return nil, ErrStreamNameRequired } - req, err := json.Marshal(&apiMsgGetRequest{Seq: seq}) + req, err := json.Marshal(mreq) if err != nil { return nil, err } @@ -784,13 +798,16 @@ func (js *js) GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, err return nil, err } if resp.Error != nil { - return nil, errors.New(resp.Error.Description) + if resp.Error.Code == 404 && strings.Contains(resp.Error.Description, "message") { + return nil, ErrMsgNotFound + } + return nil, fmt.Errorf("nats: %s", resp.Error.Description) } msg := resp.Message var hdr Header - if msg.Header != nil { + if len(msg.Header) > 0 { hdr, err = decodeHeadersMsg(msg.Header) if err != nil { return nil, err @@ -850,6 +867,16 @@ func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error { return nil } +// purgeRequest is optional request information to the purge API. +type streamPurgeRequest struct { + // Purge up to but not including sequence. + Sequence uint64 `json:"seq,omitempty"` + // Subject to match against messages for the purge command. + Subject string `json:"filter,omitempty"` + // Number of messages to keep. + Keep uint64 `json:"keep,omitempty"` +} + type streamPurgeResponse struct { apiResponse Success bool `json:"success,omitempty"` @@ -857,7 +884,11 @@ type streamPurgeResponse struct { } // PurgeStream purges messages on a Stream. -func (js *js) PurgeStream(name string, opts ...JSOpt) error { +func (js *js) PurgeStream(stream string, opts ...JSOpt) error { + return js.purgeStream(stream, nil) +} + +func (js *js) purgeStream(stream string, req *streamPurgeRequest, opts ...JSOpt) error { o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { return err @@ -866,8 +897,15 @@ func (js *js) PurgeStream(name string, opts ...JSOpt) error { defer cancel() } - psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, name)) - r, err := js.nc.RequestWithContext(o.ctx, psSubj, nil) + var b []byte + if req != nil { + if b, err = json.Marshal(req); err != nil { + return err + } + } + + psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, stream)) + r, err := js.nc.RequestWithContext(o.ctx, psSubj, b) if err != nil { return err } diff --git a/kv.go b/kv.go new file mode 100644 index 000000000..cf5468bce --- /dev/null +++ b/kv.go @@ -0,0 +1,644 @@ +// Copyright 2021 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nats + +import ( + "context" + "errors" + "fmt" + "regexp" + "strconv" + "strings" + "time" +) + +// Notice: Experimental Preview +// +// This functionality is EXPERIMENTAL and may be changed in later releases. +type KeyValueManager interface { + // KeyValue will lookup and bind to an existing KeyValue store. + KeyValue(bucket string) (KeyValue, error) + // CreateKeyValue will create a KeyValue store with the following configuration. + CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) + // DeleteKeyValue will delete this KeyValue store (JetStream stream). + DeleteKeyValue(bucket string) error +} + +// Notice: Experimental Preview +// +// This functionality is EXPERIMENTAL and may be changed in later releases. +type KeyValue interface { + // Get returns the latest value for the key. + Get(key string) (entry KeyValueEntry, err error) + // Put will place the new value for the key into the store. + Put(key string, value []byte) (revision uint64, err error) + // PutString will place the string for the key into the store. + PutString(key string, value string) (revision uint64, err error) + // Create will add the key/value pair iff it does not exist. + Create(key string, value []byte) (revision uint64, err error) + // Update will update the value iff the latest revision matches. + Update(key string, value []byte, last uint64) (revision uint64, err error) + // Delete will place a delete marker and leave all revisions. + Delete(key string) error + // Purge will place a delete marker and remove all previous revisions. + Purge(key string) error + // Watch for any updates to keys that match the keys argument which could include wildcards. + // Watch will send a nil entry when it has received all initial values. + Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) + // WatchAll will invoke the callback for all updates. + WatchAll(opts ...WatchOpt) (KeyWatcher, error) + // Keys will return all keys. + Keys(opts ...WatchOpt) ([]string, error) + // History will return all historical values for the key. + History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) + // Bucket returns the current bucket name. + Bucket() string + // PurgeDeletes will remove all current delete markers. + PurgeDeletes(opts ...WatchOpt) error +} + +// KeyWatcher is what is returned when doing a watch. +type KeyWatcher interface { + // Updates returns a channel to read any updates to entries. + Updates() <-chan KeyValueEntry + // Stop() will stop this watcher. + Stop() error +} + +type WatchOpt interface { + configureWatcher(opts *watchOpts) error +} + +// For nats.Context() support. +func (ctx ContextOpt) configureWatcher(opts *watchOpts) error { + opts.ctx = ctx + return nil +} + +type watchOpts struct { + ctx context.Context + // Do not send delete markers to the update channel. + ignoreDeletes bool + // Include all history per subject, not just last one. + includeHistory bool +} + +type watchOptFn func(opts *watchOpts) error + +func (opt watchOptFn) configureWatcher(opts *watchOpts) error { + return opt(opts) +} + +// IncludeHistory instructs the key watcher to include historical values as well. +func IncludeHistory() WatchOpt { + return watchOptFn(func(opts *watchOpts) error { + opts.includeHistory = true + return nil + }) +} + +// IgnoreDeletes will have the key watcher not pass any deleted keys. +func IgnoreDeletes() WatchOpt { + return watchOptFn(func(opts *watchOpts) error { + opts.ignoreDeletes = true + return nil + }) +} + +// KeyValueConfig is for configuring a KeyValue store. +type KeyValueConfig struct { + Bucket string + Description string + MaxValueSize int32 + History uint8 + TTL time.Duration + MaxBytes int64 + Storage StorageType + Replicas int +} + +// Used to watch all keys. +const ( + KeyValueMaxHistory = 64 + AllKeys = ">" + kvop = "KV-Operation" + kvdel = "DEL" + kvpurge = "PURGE" +) + +type KeyValueOp uint8 + +const ( + KeyValuePut KeyValueOp = iota + KeyValueDelete + KeyValuePurge +) + +func (op KeyValueOp) String() string { + switch op { + case KeyValuePut: + return "KeyValuePutOp" + case KeyValueDelete: + return "KeyValueDeleteOp" + case KeyValuePurge: + return "KeyValuePurgeOp" + default: + return "Unknown Operation" + } +} + +// KeyValueEntry is a retrieved entry for Get or List or Watch. +type KeyValueEntry interface { + // Bucket is the bucket the data was loaded from. + Bucket() string + // Key is the key that was retrieved. + Key() string + // Value is the retrieved value. + Value() []byte + // Revision is a unique sequence for this value. + Revision() uint64 + // Created is the time the data was put in the bucket. + Created() time.Time + // Delta is distance from the latest value. + Delta() uint64 + // Operation returns Put or Delete or Purge. + Operation() KeyValueOp +} + +// Errors +var ( + ErrKeyValueConfigRequired = errors.New("nats: config required") + ErrInvalidBucketName = errors.New("nats: invalid bucket name") + ErrInvalidKey = errors.New("nats: invalid key") + ErrBucketNotFound = errors.New("nats: bucket not found") + ErrBadBucket = errors.New("nats: bucket not valid key-value store") + ErrKeyNotFound = errors.New("nats: key not found") + ErrKeyDeleted = errors.New("nats: key was deleted") + ErrHistoryToLarge = errors.New("nats: history limited to a max of 64") + ErrNoKeysFound = errors.New("nats: no keys found") +) + +const ( + kvBucketNameTmpl = "KV_%s" + kvSubjectsTmpl = "$KV.%s.>" + kvSubjectsPreTmpl = "$KV.%s." + kvNoPending = "0" +) + +// Regex for valid keys and buckets. +var ( + validBucketRe = regexp.MustCompile(`\A[a-zA-Z0-9_-]+\z`) + validKeyRe = regexp.MustCompile(`\A[-/_=\.a-zA-Z0-9]+\z`) +) + +// KeyValue will lookup and bind to an existing KeyValue store. +func (js *js) KeyValue(bucket string) (KeyValue, error) { + if !js.nc.serverMinVersion(2, 6, 2) { + return nil, errors.New("nats: key-value requires at least server version 2.6.2") + } + if !validBucketRe.MatchString(bucket) { + return nil, ErrInvalidBucketName + } + stream := fmt.Sprintf(kvBucketNameTmpl, bucket) + si, err := js.StreamInfo(stream) + if err != nil { + if err == ErrStreamNotFound { + err = ErrBucketNotFound + } + return nil, err + } + // Do some quick sanity checks that this is a correctly formed stream for KV. + // Max msgs per subject should be > 0. + if si.Config.MaxMsgsPerSubject < 1 { + return nil, ErrBadBucket + } + + kv := &kvs{ + name: bucket, + stream: stream, + pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket), + js: js, + } + return kv, nil +} + +// CreateKeyValue will create a KeyValue store with the following configuration. +func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { + if !js.nc.serverMinVersion(2, 6, 2) { + return nil, errors.New("nats: key-value requires at least server version 2.6.2") + } + if cfg == nil { + return nil, ErrKeyValueConfigRequired + } + if !validBucketRe.MatchString(cfg.Bucket) { + return nil, ErrInvalidBucketName + } + if _, err := js.AccountInfo(); err != nil { + return nil, err + } + + // Default to 1 for history. Max is 64 for now. + history := int64(1) + if cfg.History > 0 { + if cfg.History > KeyValueMaxHistory { + return nil, ErrHistoryToLarge + } + history = int64(cfg.History) + } + + replicas := cfg.Replicas + if replicas == 0 { + replicas = 1 + } + + scfg := &StreamConfig{ + Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket), + Description: cfg.Description, + Subjects: []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}, + MaxMsgsPerSubject: history, + MaxBytes: cfg.MaxBytes, + MaxAge: cfg.TTL, + MaxMsgSize: cfg.MaxValueSize, + Storage: cfg.Storage, + Replicas: replicas, + AllowRollup: true, + DenyDelete: true, + } + + if _, err := js.AddStream(scfg); err != nil { + return nil, err + } + + kv := &kvs{ + name: cfg.Bucket, + stream: scfg.Name, + pre: fmt.Sprintf(kvSubjectsPreTmpl, cfg.Bucket), + js: js, + } + return kv, nil +} + +// DeleteKeyValue will delete this KeyValue store (JetStream stream). +func (js *js) DeleteKeyValue(bucket string) error { + if !validBucketRe.MatchString(bucket) { + return ErrInvalidBucketName + } + stream := fmt.Sprintf(kvBucketNameTmpl, bucket) + return js.DeleteStream(stream) +} + +type kvs struct { + name string + stream string + pre string + js *js +} + +// Underlying entry. +type kve struct { + bucket string + key string + value []byte + revision uint64 + delta uint64 + created time.Time + op KeyValueOp +} + +func (e *kve) Bucket() string { return e.bucket } +func (e *kve) Key() string { return e.key } +func (e *kve) Value() []byte { return e.value } +func (e *kve) Revision() uint64 { return e.revision } +func (e *kve) Created() time.Time { return e.created } +func (e *kve) Delta() uint64 { return e.delta } +func (e *kve) Operation() KeyValueOp { return e.op } + +func keyValid(key string) bool { + if len(key) == 0 || key[0] == '.' || key[len(key)-1] == '.' { + return false + } + return validKeyRe.MatchString(key) +} + +// Get returns the latest value for the key. +func (kv *kvs) Get(key string) (KeyValueEntry, error) { + if !keyValid(key) { + return nil, ErrInvalidKey + } + + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(key) + + m, err := kv.js.GetLastMsg(kv.stream, b.String()) + if err != nil { + if err == ErrMsgNotFound { + err = ErrKeyNotFound + } + return nil, err + } + + entry := &kve{ + bucket: kv.name, + key: key, + value: m.Data, + revision: m.Sequence, + created: m.Time, + } + + // Double check here that this is not a DEL Operation marker. + if len(m.Header) > 0 { + switch m.Header.Get(kvop) { + case kvdel: + entry.op = KeyValueDelete + return entry, ErrKeyDeleted + case kvpurge: + entry.op = KeyValuePurge + return entry, ErrKeyDeleted + } + } + + return entry, nil +} + +// Put will place the new value for the key into the store. +func (kv *kvs) Put(key string, value []byte) (revision uint64, err error) { + if !keyValid(key) { + return 0, ErrInvalidKey + } + + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(key) + + pa, err := kv.js.Publish(b.String(), value) + if err != nil { + return 0, err + } + return pa.Sequence, err +} + +// PutString will place the string for the key into the store. +func (kv *kvs) PutString(key string, value string) (revision uint64, err error) { + return kv.Put(key, []byte(value)) +} + +// Create will add the key/value pair iff it does not exist. +func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) { + v, err := kv.Update(key, value, 0) + if err == nil { + return v, nil + } + // TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that + // so we need to double check. + if e, err := kv.Get(key); err == ErrKeyDeleted { + return kv.Update(key, value, e.Revision()) + } + return 0, err +} + +// Update will update the value iff the latest revision matches. +func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) { + if !keyValid(key) { + return 0, ErrInvalidKey + } + + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(key) + + m := Msg{Subject: b.String(), Header: Header{}, Data: value} + m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(revision, 10)) + + pa, err := kv.js.PublishMsg(&m) + if err != nil { + return 0, err + } + return pa.Sequence, err +} + +// Delete will place a delete marker and leave all revisions. +func (kv *kvs) Delete(key string) error { + return kv.delete(key, false) +} + +// Purge will remove the key and all revisions. +func (kv *kvs) Purge(key string) error { + return kv.delete(key, true) +} + +func (kv *kvs) delete(key string, purge bool) error { + if !keyValid(key) { + return ErrInvalidKey + } + + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(key) + + // DEL op marker. For watch functionality. + m := NewMsg(b.String()) + + if purge { + m.Header.Set(kvop, kvpurge) + m.Header.Set(MsgRollup, MsgRollupSubject) + } else { + m.Header.Set(kvop, kvdel) + } + _, err := kv.js.PublishMsg(m) + return err +} + +// PurgeDeletes will remove all current delete markers. +// This is a maintenance option if there is a larger buildup of delete markers. +func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error { + watcher, err := kv.WatchAll(opts...) + if err != nil { + return err + } + defer watcher.Stop() + + for entry := range watcher.Updates() { + if entry == nil { + break + } + if op := entry.Operation(); op == KeyValueDelete || op == KeyValuePurge { + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(entry.Key()) + err := kv.js.purgeStream(kv.stream, &streamPurgeRequest{Subject: b.String()}) + if err != nil { + return err + } + } + } + return nil +} + +// Keys() will return all keys. +func (kv *kvs) Keys(opts ...WatchOpt) ([]string, error) { + opts = append(opts, IgnoreDeletes()) + watcher, err := kv.WatchAll(opts...) + if err != nil { + return nil, err + } + defer watcher.Stop() + + var keys []string + for entry := range watcher.Updates() { + if entry == nil { + break + } + keys = append(keys, entry.Key()) + } + if len(keys) == 0 { + return nil, ErrNoKeysFound + } + return keys, nil +} + +// History will return all values for the key. +func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) { + opts = append(opts, IncludeHistory()) + watcher, err := kv.Watch(key, opts...) + if err != nil { + return nil, err + } + defer watcher.Stop() + + var entries []KeyValueEntry + for entry := range watcher.Updates() { + if entry == nil { + break + } + entries = append(entries, entry) + } + if len(entries) == 0 { + return nil, ErrKeyNotFound + } + return entries, nil +} + +// Implementation for Watch +type watcher struct { + updates chan KeyValueEntry + sub *Subscription +} + +// Updates returns the interior channel. +func (w *watcher) Updates() <-chan KeyValueEntry { + if w == nil { + return nil + } + return w.updates +} + +// Stop will unsubscribe from the watcher. +func (w *watcher) Stop() error { + if w == nil { + return nil + } + return w.sub.Unsubscribe() +} + +// WatchAll watches all keys. +func (kv *kvs) WatchAll(opts ...WatchOpt) (KeyWatcher, error) { + return kv.Watch(AllKeys, opts...) +} + +// Watch will fire the callback when a key that matches the keys pattern is updated. +// keys needs to be a valid NATS subject. +func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { + var o watchOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureWatcher(&o); err != nil { + return nil, err + } + } + } + + var initDoneMarker bool + + // Could be a pattern so don't check for validity as we normally do. + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(keys) + keys = b.String() + + w := &watcher{updates: make(chan KeyValueEntry, 32)} + + update := func(m *Msg) { + tokens, err := getMetadataFields(m.Reply) + if err != nil { + return + } + if len(m.Subject) <= len(kv.pre) { + return + } + subj := m.Subject[len(kv.pre):] + + var op KeyValueOp + if len(m.Header) > 0 { + switch m.Header.Get(kvop) { + case kvdel: + op = KeyValueDelete + case kvpurge: + op = KeyValuePurge + } + } + delta := uint64(parseNum(tokens[ackNumPendingTokenPos])) + entry := &kve{ + bucket: kv.name, + key: subj, + value: m.Data, + revision: uint64(parseNum(tokens[ackStreamSeqTokenPos])), + created: time.Unix(0, parseNum(tokens[ackTimestampSeqTokenPos])), + delta: delta, + op: op, + } + if !o.ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge) { + w.updates <- entry + } + // Check if done initial values. + if !initDoneMarker && delta == 0 { + initDoneMarker = true + w.updates <- nil + } + } + + // Check if we have anything pending. + _, err := kv.js.GetLastMsg(kv.stream, keys) + if err == ErrMsgNotFound { + initDoneMarker = true + w.updates <- nil + } + + // Used ordered consumer to deliver results. + subOpts := []SubOpt{OrderedConsumer()} + if !o.includeHistory { + subOpts = append(subOpts, DeliverLastPerSubject()) + } + sub, err := kv.js.Subscribe(keys, update, subOpts...) + if err != nil { + return nil, err + } + w.sub = sub + return w, nil +} + +// Bucket returns the current bucket name (JetStream stream). +func (kv *kvs) Bucket() string { + return kv.name +} diff --git a/nats.go b/nats.go index 95a5577df..2fa5d6c15 100644 --- a/nats.go +++ b/nats.go @@ -32,6 +32,7 @@ import ( "net/url" "os" "path/filepath" + "regexp" "runtime" "strconv" "strings" @@ -155,6 +156,7 @@ var ( ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer") ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer") ErrConsumerNotActive = errors.New("nats: consumer not active") + ErrMsgNotFound = errors.New("nats: message not found") ) func init() { @@ -677,6 +679,7 @@ type serverInfo struct { ID string `json:"server_id"` Name string `json:"server_name"` Proto int `json:"proto"` + Version string `json:"version"` Host string `json:"host"` Port int `json:"port"` Headers bool `json:"headers"` @@ -1834,6 +1837,52 @@ func (nc *Conn) ConnectedServerName() string { return nc.info.Name } +var semVerRe = regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`) + +func versionComponents(version string) (major, minor, patch int, err error) { + m := semVerRe.FindStringSubmatch(version) + if m == nil { + return 0, 0, 0, errors.New("invalid semver") + } + major, err = strconv.Atoi(m[1]) + if err != nil { + return -1, -1, -1, err + } + minor, err = strconv.Atoi(m[2]) + if err != nil { + return -1, -1, -1, err + } + patch, err = strconv.Atoi(m[3]) + if err != nil { + return -1, -1, -1, err + } + return major, minor, patch, err +} + +// Check for mininum server requirement. +func (nc *Conn) serverMinVersion(major, minor, patch int) bool { + smajor, sminor, spatch, _ := versionComponents(nc.ConnectedServerVersion()) + if smajor < major || (smajor == major && sminor < minor) || (smajor == major && sminor == minor && spatch < patch) { + return false + } + return true +} + +// ConnectedServerVersion reports the connected server's version as a string +func (nc *Conn) ConnectedServerVersion() string { + if nc == nil { + return _EMPTY_ + } + + nc.mu.RLock() + defer nc.mu.RUnlock() + + if nc.status != CONNECTED { + return _EMPTY_ + } + return nc.info.Version +} + // ConnectedClusterName reports the connected server's cluster name if any func (nc *Conn) ConnectedClusterName() string { if nc == nil { @@ -2981,7 +3030,7 @@ func (nc *Conn) processInfo(info string) error { if info == _EMPTY_ { return nil } - ncInfo := serverInfo{} + var ncInfo serverInfo if err := json.Unmarshal([]byte(info), &ncInfo); err != nil { return err } diff --git a/norace_test.go b/norace_test.go index 7d16687be..24938ace3 100644 --- a/norace_test.go +++ b/norace_test.go @@ -705,7 +705,7 @@ func TestJetStreamPushFlowControl_SubscribeAsyncAndChannel(t *testing.T) { // Cause bottleneck by having channel block when full // because of work taking long. recvd <- msg - }, EnableFlowControl()) + }, EnableFlowControl(), IdleHeartbeat(5*time.Second)) if err != nil { t.Fatal(err) diff --git a/object.go b/object.go new file mode 100644 index 000000000..13dd7b280 --- /dev/null +++ b/object.go @@ -0,0 +1,928 @@ +// Copyright 2021 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nats + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "strings" + "sync" + "time" + + "github.com/nats-io/nuid" +) + +// Notice: Experimental Preview +// +// This functionality is EXPERIMENTAL and may be changed in later releases. +type ObjectStoreManager interface { + // ObjectStore will lookup and bind to an existing object store instance. + ObjectStore(bucket string) (ObjectStore, error) + // CreateObjectStore will create an object store. + CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) + // DeleteObjectStore will delete the underlying stream for the named object. + DeleteObjectStore(bucket string) error +} + +// Notice: Experimental Preview +// +// This functionality is EXPERIMENTAL and may be changed in later releases. +type ObjectStore interface { + // Put will place the contents from the reader into a new object. + Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) + // Get will pull the named object from the object store. + Get(name string, opts ...ObjectOpt) (ObjectResult, error) + + // PutBytes is convenience function to put a byte slice into this object store. + PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error) + // GetBytes is a convenience function to pull an object from this object store and return it as a byte slice. + GetBytes(name string, opts ...ObjectOpt) ([]byte, error) + + // PutBytes is convenience function to put a string into this object store. + PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error) + // GetString is a convenience function to pull an object from this object store and return it as a string. + GetString(name string, opts ...ObjectOpt) (string, error) + + // PutFile is convenience function to put a file into this object store. + PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) + // GetFile is a convenience function to pull an object from this object store and place it in a file. + GetFile(name, file string, opts ...ObjectOpt) error + + // GetInfo will retrieve the current information for the object. + GetInfo(name string) (*ObjectInfo, error) + // UpdateMeta will update the meta data for the object. + UpdateMeta(name string, meta *ObjectMeta) error + + // Delete will delete the named object. + Delete(name string) error + + // AddLink will add a link to another object into this object store. + AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) + + // AddBucketLink will add a link to another object store. + AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, error) + + // Seal will seal the object store, no further modifications will be allowed. + Seal() error + + // Watch for changes in the underlying store and receive meta information updates. + Watch(opts ...WatchOpt) (ObjectWatcher, error) + + // List will list all the objects in this store. + List(opts ...WatchOpt) ([]*ObjectInfo, error) +} + +type ObjectOpt interface { + configureObject(opts *objOpts) error +} + +type objOpts struct { + ctx context.Context +} + +// For nats.Context() support. +func (ctx ContextOpt) configureObject(opts *objOpts) error { + opts.ctx = ctx + return nil +} + +// ObjectWatcher is what is returned when doing a watch. +type ObjectWatcher interface { + // Updates returns a channel to read any updates to entries. + Updates() <-chan *ObjectInfo + // Stop() will stop this watcher. + Stop() error +} + +var ( + ErrObjectConfigRequired = errors.New("nats: object-store config required") + ErrBadObjectMeta = errors.New("nats: object-store meta information invalid") + ErrObjectNotFound = errors.New("nats: object not found") + ErrInvalidStoreName = errors.New("nats: invalid object-store name") + ErrInvalidObjectName = errors.New("nats: invalid object name") + ErrDigestMismatch = errors.New("nats: received a corrupt object, digests do not match") + ErrNoObjectsFound = errors.New("nats: no objects found") +) + +// ObjectStoreConfig is the config for the object store. +type ObjectStoreConfig struct { + Bucket string + Description string + TTL time.Duration + Storage StorageType + Replicas int +} + +// ObjectMetaOptions +type ObjectMetaOptions struct { + Link *ObjectLink `json:"link,omitempty"` + ChunkSize uint32 `json:"max_chunk_size,omitempty"` +} + +// ObjectMeta is high level information about an object. +type ObjectMeta struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Headers Header `json:"headers,omitempty"` + + // Optional options. + Opts *ObjectMetaOptions `json:"options,omitempty"` +} + +// ObjectInfo is meta plus instance information. +type ObjectInfo struct { + ObjectMeta + Bucket string `json:"bucket"` + NUID string `json:"nuid"` + Size uint64 `json:"size"` + ModTime time.Time `json:"mtime"` + Chunks uint32 `json:"chunks"` + Digest string `json:"digest,omitempty"` + Deleted bool `json:"deleted,omitempty"` +} + +// ObjectLink is used to embed links to other buckets and objects. +type ObjectLink struct { + // Bucket is the name of the other object store. + Bucket string `json:"bucket"` + // Name can be used to link to a single object. + // If empty means this is a link to the whole store, like a directory. + Name string `json:"name,omitempty"` +} + +// ObjectResult will return the underlying stream info and also be an io.ReadCloser. +type ObjectResult interface { + io.ReadCloser + Info() (*ObjectInfo, error) + Error() error +} + +const ( + objNameTmpl = "OBJ_%s" + objSubjectsPre = "$O." + objAllChunksPreTmpl = "$O.%s.C.>" + objAllMetaPreTmpl = "$O.%s.M.>" + objChunksPreTmpl = "$O.%s.C.%s" + objMetaPreTmpl = "$O.%s.M.%s" + objNoPending = "0" + objDefaultChunkSize = uint32(128 * 1024) // 128k + objDigestType = "sha-256=" + objDigestTmpl = objDigestType + "%s" +) + +type obs struct { + name string + stream string + js *js +} + +// CreateObjectStore will create an object store. +func (js *js) CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) { + if !js.nc.serverMinVersion(2, 6, 2) { + return nil, errors.New("nats: object-store requires at least server version 2.6.2") + } + if cfg == nil { + return nil, ErrObjectConfigRequired + } + if !validBucketRe.MatchString(cfg.Bucket) { + return nil, ErrInvalidStoreName + } + + name := cfg.Bucket + chunks := fmt.Sprintf(objAllChunksPreTmpl, name) + meta := fmt.Sprintf(objAllMetaPreTmpl, name) + + scfg := &StreamConfig{ + Name: fmt.Sprintf(objNameTmpl, name), + Description: cfg.Description, + Subjects: []string{chunks, meta}, + MaxAge: cfg.TTL, + Storage: cfg.Storage, + Replicas: cfg.Replicas, + Discard: DiscardNew, + AllowRollup: true, + } + + // Create our stream. + _, err := js.AddStream(scfg) + if err != nil { + return nil, err + } + + return &obs{name: name, stream: scfg.Name, js: js}, nil +} + +// ObjectStore will lookup and bind to an existing object store instance. +func (js *js) ObjectStore(bucket string) (ObjectStore, error) { + if !validBucketRe.MatchString(bucket) { + return nil, ErrInvalidStoreName + } + if !js.nc.serverMinVersion(2, 6, 2) { + return nil, errors.New("nats: key-value requires at least server version 2.6.2") + } + + stream := fmt.Sprintf(objNameTmpl, bucket) + si, err := js.StreamInfo(stream) + if err != nil { + return nil, err + } + return &obs{name: bucket, stream: si.Config.Name, js: js}, nil +} + +// DeleteObjectStore will delete the underlying stream for the named object. +func (js *js) DeleteObjectStore(bucket string) error { + stream := fmt.Sprintf(objNameTmpl, bucket) + return js.DeleteStream(stream) +} + +func sanitizeName(name string) string { + stream := strings.ReplaceAll(name, ".", "_") + return strings.ReplaceAll(stream, " ", "_") +} + +// Put will place the contents from the reader into this object-store. +func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) { + if meta == nil { + return nil, ErrBadObjectMeta + } + + obj := sanitizeName(meta.Name) + if !keyValid(obj) { + return nil, ErrInvalidObjectName + } + + var o objOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureObject(&o); err != nil { + return nil, err + } + } + } + ctx := o.ctx + + // Grab existing meta info. + einfo, err := obs.GetInfo(meta.Name) + if err != nil && err != ErrObjectNotFound { + return nil, err + } + + // Create a random subject prefixed with the object stream name. + id := nuid.Next() + chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, id) + metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, obj) + + // For async error handling + var perr error + var mu sync.Mutex + setErr := func(err error) { + mu.Lock() + defer mu.Unlock() + perr = err + } + getErr := func() error { + mu.Lock() + defer mu.Unlock() + return perr + } + + purgePartial := func() { obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj}) } + + // Create our own JS context to handle errors etc. + js, err := obs.js.nc.JetStream(PublishAsyncErrHandler(func(js JetStream, _ *Msg, err error) { setErr(err) })) + if err != nil { + return nil, err + } + + chunkSize := objDefaultChunkSize + if meta.Opts != nil && meta.Opts.ChunkSize > 0 { + chunkSize = meta.Opts.ChunkSize + } + + m, h := NewMsg(chunkSubj), sha256.New() + chunk, sent, total := make([]byte, chunkSize), 0, uint64(0) + info := &ObjectInfo{Bucket: obs.name, NUID: id, ObjectMeta: *meta} + + for r != nil { + if ctx != nil { + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + err = ctx.Err() + } else { + err = ErrTimeout + } + default: + } + if err != nil { + purgePartial() + return nil, err + } + } + + // Actual read. + // TODO(dlc) - Deadline? + n, err := r.Read(chunk) + + // EOF Processing. + if err == io.EOF { + // Finalize sha. + sha := h.Sum(nil) + // Place meta info. + info.Size, info.Chunks = uint64(total), uint32(sent) + info.Digest = fmt.Sprintf(objDigestTmpl, base64.URLEncoding.EncodeToString(sha[:])) + break + } else if err != nil { + purgePartial() + return nil, err + } + + // Chunk processing. + m.Data = chunk[:n] + h.Write(m.Data) + + // Send msg itself. + if _, err := js.PublishMsgAsync(m); err != nil { + purgePartial() + return nil, err + } + if err := getErr(); err != nil { + purgePartial() + return nil, err + } + // Update totals. + sent++ + total += uint64(n) + } + + // Publish the metadata. + mm := NewMsg(metaSubj) + mm.Header.Set(MsgRollup, MsgRollupSubject) + mm.Data, err = json.Marshal(info) + if err != nil { + if r != nil { + purgePartial() + } + return nil, err + } + // Send meta message. + _, err = js.PublishMsgAsync(mm) + if err != nil { + if r != nil { + purgePartial() + } + return nil, err + } + + // Wait for all to be processed. + select { + case <-js.PublishAsyncComplete(): + if err := getErr(); err != nil { + purgePartial() + return nil, err + } + case <-time.After(obs.js.opts.wait): + return nil, ErrTimeout + } + info.ModTime = time.Now().UTC() + + // Delete any original one. + if einfo != nil && !einfo.Deleted { + chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, einfo.NUID) + obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj}) + } + + return info, nil +} + +// ObjectResult impl. +type objResult struct { + sync.Mutex + info *ObjectInfo + r io.ReadCloser + err error + ctx context.Context +} + +func (info *ObjectInfo) isLink() bool { + return info.ObjectMeta.Opts != nil && info.ObjectMeta.Opts.Link != nil +} + +// GetObject will pull the object from the underlying stream. +func (obs *obs) Get(name string, opts ...ObjectOpt) (ObjectResult, error) { + // Grab meta info. + info, err := obs.GetInfo(name) + if err != nil { + return nil, err + } + if info.NUID == _EMPTY_ { + return nil, ErrBadObjectMeta + } + + // Check for object links.If single objects we do a pass through. + if info.isLink() { + if info.ObjectMeta.Opts.Link.Name == _EMPTY_ { + return nil, errors.New("nats: link is a bucket") + } + lobs, err := obs.js.ObjectStore(info.ObjectMeta.Opts.Link.Bucket) + if err != nil { + return nil, err + } + return lobs.Get(info.ObjectMeta.Opts.Link.Name) + } + + var o objOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureObject(&o); err != nil { + return nil, err + } + } + } + ctx := o.ctx + + result := &objResult{info: info, ctx: ctx} + if info.Size == 0 { + return result, nil + } + + pr, pw := net.Pipe() + result.r = pr + + gotErr := func(m *Msg, err error) { + pw.Close() + m.Sub.Unsubscribe() + result.setErr(err) + } + + // For calculating sum256 + h := sha256.New() + + processChunk := func(m *Msg) { + if ctx != nil { + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + err = ctx.Err() + } else { + err = ErrTimeout + } + default: + } + if err != nil { + gotErr(m, err) + return + } + } + + tokens, err := getMetadataFields(m.Reply) + if err != nil { + gotErr(m, err) + return + } + + // Write to our pipe. + for b := m.Data; len(b) > 0; { + n, err := pw.Write(b) + if err != nil { + gotErr(m, err) + return + } + b = b[n:] + } + // Update sha256 + h.Write(m.Data) + + // Check if we are done. + if tokens[ackNumPendingTokenPos] == objNoPending { + pw.Close() + m.Sub.Unsubscribe() + + // Make sure the digest matches. + sha := h.Sum(nil) + rsha, err := base64.URLEncoding.DecodeString(info.Digest) + if err != nil { + gotErr(m, err) + return + } + if !bytes.Equal(sha[:], rsha) { + gotErr(m, ErrDigestMismatch) + return + } + } + } + + chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID) + _, err = obs.js.Subscribe(chunkSubj, processChunk, OrderedConsumer()) + if err != nil { + return nil, err + } + + return result, nil +} + +// Delete will delete the object. +func (obs *obs) Delete(name string) error { + // Grab meta info. + info, err := obs.GetInfo(name) + if err != nil { + return err + } + if info.NUID == _EMPTY_ { + return ErrBadObjectMeta + } + + // Place a rollup delete marker. + info.Deleted = true + info.Size, info.Chunks, info.Digest = 0, 0, _EMPTY_ + + metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, sanitizeName(name)) + mm := NewMsg(metaSubj) + mm.Data, err = json.Marshal(info) + if err != nil { + return err + } + mm.Header.Set(MsgRollup, MsgRollupSubject) + _, err = obs.js.PublishMsg(mm) + if err != nil { + return err + } + + // Purge chunks for the object. + chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID) + return obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj}) +} + +// AddLink will add a link to another object into this object store. +func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) { + if obj == nil { + return nil, errors.New("nats: object required") + } + if obj.Deleted { + return nil, errors.New("nats: object is deleted") + } + name = sanitizeName(name) + if !keyValid(name) { + return nil, ErrInvalidObjectName + } + + // Same object store. + if obj.Bucket == obs.name { + info := *obj + info.Name = name + if err := obs.UpdateMeta(obj.Name, &info.ObjectMeta); err != nil { + return nil, err + } + return obs.GetInfo(name) + } + + link := &ObjectLink{Bucket: obj.Bucket, Name: obj.Name} + meta := &ObjectMeta{ + Name: name, + Opts: &ObjectMetaOptions{Link: link}, + } + return obs.Put(meta, nil) +} + +// AddBucketLink will add a link to another object store. +func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, error) { + if bucket == nil { + return nil, errors.New("nats: bucket required") + } + name = sanitizeName(name) + if !keyValid(name) { + return nil, ErrInvalidObjectName + } + + bos, ok := bucket.(*obs) + if !ok { + return nil, errors.New("nats: bucket malformed") + } + meta := &ObjectMeta{ + Name: name, + Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: bos.name}}, + } + return ob.Put(meta, nil) +} + +// PutBytes is convenience function to put a byte slice into this object store. +func (obs *obs) PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error) { + return obs.Put(&ObjectMeta{Name: name}, bytes.NewReader(data), opts...) +} + +// GetBytes is a convenience function to pull an object from this object store and return it as a byte slice. +func (obs *obs) GetBytes(name string, opts ...ObjectOpt) ([]byte, error) { + result, err := obs.Get(name, opts...) + if err != nil { + return nil, err + } + defer result.Close() + + var b bytes.Buffer + if _, err := b.ReadFrom(result); err != nil { + return nil, err + } + return b.Bytes(), nil +} + +// PutBytes is convenience function to put a string into this object store. +func (obs *obs) PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error) { + return obs.Put(&ObjectMeta{Name: name}, strings.NewReader(data), opts...) +} + +// GetString is a convenience function to pull an object from this object store and return it as a string. +func (obs *obs) GetString(name string, opts ...ObjectOpt) (string, error) { + result, err := obs.Get(name, opts...) + if err != nil { + return _EMPTY_, err + } + defer result.Close() + + var b bytes.Buffer + if _, err := b.ReadFrom(result); err != nil { + return _EMPTY_, err + } + return b.String(), nil +} + +// PutFile is convenience function to put a file into an object store. +func (obs *obs) PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) { + f, err := os.Open(file) + if err != nil { + return nil, err + } + defer f.Close() + return obs.Put(&ObjectMeta{Name: file}, f, opts...) +} + +// GetFile is a convenience function to pull and object and place in a file. +func (obs *obs) GetFile(name, file string, opts ...ObjectOpt) error { + // Expect file to be new. + f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return err + } + defer f.Close() + + result, err := obs.Get(name, opts...) + if err != nil { + os.Remove(f.Name()) + return err + } + defer result.Close() + + // Stream copy to the file. + _, err = io.Copy(f, result) + return err +} + +// GetInfo will retrieve the current information for the object. +func (obs *obs) GetInfo(name string) (*ObjectInfo, error) { + // Lookup the stream to get the bound subject. + obj := sanitizeName(name) + if !keyValid(obj) { + return nil, ErrInvalidObjectName + } + + // Grab last meta value we have. + meta := fmt.Sprintf(objMetaPreTmpl, obs.name, obj) + stream := fmt.Sprintf(objNameTmpl, obs.name) + + m, err := obs.js.GetLastMsg(stream, meta) + if err != nil { + if err == ErrMsgNotFound { + err = ErrObjectNotFound + } + return nil, err + } + var info ObjectInfo + if err := json.Unmarshal(m.Data, &info); err != nil { + return nil, ErrBadObjectMeta + } + info.ModTime = m.Time + return &info, nil +} + +// UpdateMeta will update the meta data for the object. +func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error { + if meta == nil { + return ErrBadObjectMeta + } + // Grab meta info. + info, err := obs.GetInfo(name) + if err != nil { + return err + } + // Copy new meta + info.ObjectMeta = *meta + mm := NewMsg(fmt.Sprintf(objMetaPreTmpl, obs.name, sanitizeName(meta.Name))) + mm.Data, err = json.Marshal(info) + if err != nil { + return err + } + _, err = obs.js.PublishMsg(mm) + return err +} + +// Seal will seal the object store, no further modifications will be allowed. +func (obs *obs) Seal() error { + stream := fmt.Sprintf(objNameTmpl, obs.name) + si, err := obs.js.StreamInfo(stream) + if err != nil { + return err + } + // Seal the stream from being able to take on more messages. + cfg := si.Config + cfg.Sealed = true + _, err = obs.js.UpdateStream(&cfg) + return err +} + +// Implementation for Watch +type objWatcher struct { + updates chan *ObjectInfo + sub *Subscription +} + +// Updates returns the interior channel. +func (w *objWatcher) Updates() <-chan *ObjectInfo { + if w == nil { + return nil + } + return w.updates +} + +// Stop will unsubscribe from the watcher. +func (w *objWatcher) Stop() error { + if w == nil { + return nil + } + return w.sub.Unsubscribe() +} + +// Watch for changes in the underlying store and receive meta information updates. +func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) { + var o watchOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureWatcher(&o); err != nil { + return nil, err + } + } + } + + var initDoneMarker bool + + w := &objWatcher{updates: make(chan *ObjectInfo, 32)} + + update := func(m *Msg) { + var info ObjectInfo + if err := json.Unmarshal(m.Data, &info); err != nil { + return // TODO(dlc) - Communicate this upwards? + } + meta, err := m.Metadata() + if err != nil { + return + } + + if !o.ignoreDeletes || !info.Deleted { + info.ModTime = meta.Timestamp + w.updates <- &info + } + + if !initDoneMarker && meta.NumPending == 0 { + initDoneMarker = true + w.updates <- nil + } + } + + allMeta := fmt.Sprintf(objAllMetaPreTmpl, obs.name) + _, err := obs.js.GetLastMsg(obs.stream, allMeta) + if err == ErrMsgNotFound { + initDoneMarker = true + w.updates <- nil + } + + // Used ordered consumer to deliver results. + subOpts := []SubOpt{OrderedConsumer()} + if !o.includeHistory { + subOpts = append(subOpts, DeliverLastPerSubject()) + } + sub, err := obs.js.Subscribe(allMeta, update, subOpts...) + if err != nil { + return nil, err + } + w.sub = sub + return w, nil +} + +// List will list all the objects in this store. +func (obs *obs) List(opts ...WatchOpt) ([]*ObjectInfo, error) { + opts = append(opts, IgnoreDeletes()) + watcher, err := obs.Watch(opts...) + if err != nil { + return nil, err + } + defer watcher.Stop() + + var objs []*ObjectInfo + for entry := range watcher.Updates() { + if entry == nil { + break + } + objs = append(objs, entry) + } + if len(objs) == 0 { + return nil, ErrNoObjectsFound + } + return objs, nil +} + +// Read impl. +func (o *objResult) Read(p []byte) (n int, err error) { + o.Lock() + defer o.Unlock() + if ctx := o.ctx; ctx != nil { + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + o.err = ctx.Err() + } else { + o.err = ErrTimeout + } + default: + } + } + if o.err != nil { + return 0, err + } + if o.r == nil { + return 0, io.EOF + } + + r := o.r.(net.Conn) + r.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) + n, err = r.Read(p) + if err, ok := err.(net.Error); ok && err.Timeout() { + if ctx := o.ctx; ctx != nil { + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + return 0, ctx.Err() + } else { + return 0, ErrTimeout + } + default: + err = nil + } + } + } + return n, err +} + +// Close impl. +func (o *objResult) Close() error { + o.Lock() + defer o.Unlock() + if o.r == nil { + return nil + } + return o.r.Close() +} + +func (o *objResult) setErr(err error) { + o.Lock() + defer o.Unlock() + o.err = err +} + +func (o *objResult) Info() (*ObjectInfo, error) { + o.Lock() + defer o.Unlock() + return o.info, o.err +} + +func (o *objResult) Error() error { + o.Lock() + defer o.Unlock() + return o.err +} diff --git a/test/js_test.go b/test/js_test.go index 7482b3f07..f2e850d96 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1604,7 +1604,7 @@ func testJetStreamManagement_GetMsg(t *testing.T, srvs ...*jsServer) { // Try to fetch the same message which should be gone. _, err = js.GetMsg("foo", originalSeq) - if err == nil || err.Error() != `no message found` { + if err == nil || err != nats.ErrMsgNotFound { t.Errorf("Expected no message found error, got: %v", err) } }) diff --git a/test/kv_test.go b/test/kv_test.go new file mode 100644 index 000000000..01934ba48 --- /dev/null +++ b/test/kv_test.go @@ -0,0 +1,463 @@ +// Copyright 2021 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "fmt" + "os" + "reflect" + "strconv" + "strings" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" +) + +func TestKeyValueBasics(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"}) + expectOk(t, err) + + if kv.Bucket() != "TEST" { + t.Fatalf("Expected bucket name to be %q, got %q", "TEST", kv.Bucket()) + } + + // Simple Put + r, err := kv.Put("name", []byte("derek")) + expectOk(t, err) + if r != 1 { + t.Fatalf("Expected 1 for the revision, got %d", r) + } + // Simple Get + e, err := kv.Get("name") + expectOk(t, err) + if string(e.Value()) != "derek" { + t.Fatalf("Got wrong value: %q vs %q", e.Value(), "derek") + } + if e.Revision() != 1 { + t.Fatalf("Expected 1 for the revision, got %d", e.Revision()) + } + + // Delete + err = kv.Delete("name") + expectOk(t, err) + _, err = kv.Get("name") + expectErr(t, err, nats.ErrKeyDeleted) + r, err = kv.Create("name", []byte("derek")) + expectOk(t, err) + if r != 3 { + t.Fatalf("Expected 3 for the revision, got %d", r) + } + + // Conditional Updates. + r, err = kv.Update("name", []byte("rip"), 3) + expectOk(t, err) + _, err = kv.Update("name", []byte("ik"), 3) + expectErr(t, err) + _, err = kv.Update("name", []byte("ik"), r) + expectOk(t, err) + r, err = kv.Create("age", []byte("22")) + expectOk(t, err) + _, err = kv.Update("age", []byte("33"), r) + expectOk(t, err) +} + +func TestKeyValueHistory(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "LIST", History: 10}) + expectOk(t, err) + + for i := 0; i < 50; i++ { + age := strconv.FormatUint(uint64(i+22), 10) + _, err := kv.Put("age", []byte(age)) + expectOk(t, err) + } + + vl, err := kv.History("age") + expectOk(t, err) + + if len(vl) != 10 { + t.Fatalf("Expected %d values, got %d", 10, len(vl)) + } + for i, v := range vl { + if v.Key() != "age" { + t.Fatalf("Expected key of %q, got %q", "age", v.Key()) + } + if v.Revision() != uint64(i+41) { + // History of 10, sent 50.. + t.Fatalf("Expected revision of %d, got %d", i+41, v.Revision()) + } + age, err := strconv.Atoi(string(v.Value())) + expectOk(t, err) + if age != i+62 { + t.Fatalf("Expected data value of %d, got %d", i+22, age) + } + } +} + +func TestKeyValueWatch(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCH"}) + expectOk(t, err) + + watcher, err := kv.WatchAll() + expectOk(t, err) + defer watcher.Stop() + + expectUpdate := func(key, value string, revision uint64) { + t.Helper() + select { + case v := <-watcher.Updates(): + if v.Key() != key || string(v.Value()) != value || v.Revision() != revision { + t.Fatalf("Did not get expected: %+v vs %q %q %d", v, key, value, revision) + } + case <-time.After(time.Second): + t.Fatalf("Did not receive an update like expected") + } + } + expectDelete := func(key string, revision uint64) { + t.Helper() + select { + case v := <-watcher.Updates(): + if v.Operation() != nats.KeyValueDelete { + t.Fatalf("Expected a delete operation but got %+v", v) + } + if v.Revision() != revision { + t.Fatalf("Did not get expected revision: %d vs %d", revision, v.Revision()) + } + case <-time.After(time.Second): + t.Fatalf("Did not receive an update like expected") + } + } + expectInitDone := func() { + t.Helper() + select { + case v := <-watcher.Updates(): + if v != nil { + t.Fatalf("Did not get expected: %+v", v) + } + case <-time.After(time.Second): + t.Fatalf("Did not receive a init done like expected") + } + } + + // Make sure we already got an initial value marker. + expectInitDone() + + kv.Create("name", []byte("derek")) + expectUpdate("name", "derek", 1) + kv.Put("name", []byte("rip")) + expectUpdate("name", "rip", 2) + kv.Put("name", []byte("ik")) + expectUpdate("name", "ik", 3) + kv.Put("age", []byte("22")) + expectUpdate("age", "22", 4) + kv.Put("age", []byte("33")) + expectUpdate("age", "33", 5) + kv.Delete("age") + expectDelete("age", 6) + + // Stop first watcher. + watcher.Stop() + + // Now try wildcard matching and make sure we only get last value when starting. + kv.Put("t.name", []byte("rip")) + kv.Put("t.name", []byte("ik")) + kv.Put("t.age", []byte("22")) + kv.Put("t.age", []byte("44")) + + watcher, err = kv.Watch("t.*") + expectOk(t, err) + defer watcher.Stop() + + expectUpdate("t.name", "ik", 8) + expectUpdate("t.age", "44", 10) + expectInitDone() +} + +func TestKeyValueBindStore(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + _, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCH"}) + expectOk(t, err) + + // Now bind to it.. + _, err = js.KeyValue("WATCH") + expectOk(t, err) + + // Make sure we can't bind to a non-kv style stream. + // We have some protection with stream name prefix. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "KV_TEST", + Subjects: []string{"foo"}, + }) + expectOk(t, err) + + _, err = js.KeyValue("TEST") + expectErr(t, err) + if err != nats.ErrBadBucket { + t.Fatalf("Expected %v but got %v", nats.ErrBadBucket, err) + } +} + +func TestKeyValueDeleteStore(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + _, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCH"}) + expectOk(t, err) + + err = js.DeleteKeyValue("WATCH") + expectOk(t, err) + + _, err = js.KeyValue("WATCH") + expectErr(t, err) +} + +func TestKeyValueDeleteVsPurge(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 10}) + expectOk(t, err) + + put := func(key, value string) { + t.Helper() + _, err := kv.Put(key, []byte(value)) + expectOk(t, err) + } + + // Put in a few names and ages. + put("name", "derek") + put("age", "22") + put("name", "ivan") + put("age", "33") + put("name", "rip") + put("age", "44") + + kv.Delete("age") + entries, err := kv.History("age") + expectOk(t, err) + // Expect three entries and delete marker. + if len(entries) != 4 { + t.Fatalf("Expected 4 entries for age after delete, got %d", len(entries)) + } + err = kv.Purge("name") + expectOk(t, err) + // Check marker + e, err := kv.Get("name") + expectErr(t, err, nats.ErrKeyDeleted) + // Also make sure op is purge + if e.Operation() != nats.KeyValuePurge { + t.Fatalf("Expected a purge operation but got %v", e.Operation()) + } + entries, err = kv.History("name") + expectOk(t, err) + if len(entries) != 1 { + t.Fatalf("Expected only 1 entry for age after delete, got %d", len(entries)) + } + // Make sure history also reports the purge operation. + if e := entries[0]; e.Operation() != nats.KeyValuePurge { + t.Fatalf("Expected a purge operation but got %v", e.Operation()) + } +} + +func TestKeyValueDeleteTombstones(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 10}) + expectOk(t, err) + + put := func(key, value string) { + t.Helper() + _, err := kv.Put(key, []byte(value)) + expectOk(t, err) + } + + v := strings.Repeat("ABC", 33) + for i := 1; i <= 100; i++ { + put(fmt.Sprintf("key-%d", i), v) + } + // Now delete them. + for i := 1; i <= 100; i++ { + err := kv.Delete(fmt.Sprintf("key-%d", i)) + expectOk(t, err) + } + // Now cleanup. + err = kv.PurgeDeletes() + expectOk(t, err) + + si, err := js.StreamInfo("KV_KVS") + expectOk(t, err) + if si.State.Msgs != 0 { + t.Fatalf("Expected no stream msgs to be left, got %d", si.State.Msgs) + } +} + +func TestKeyValueKeys(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 2}) + expectOk(t, err) + + put := func(key, value string) { + t.Helper() + _, err := kv.Put(key, []byte(value)) + expectOk(t, err) + } + + _, err = kv.Keys() + expectErr(t, err, nats.ErrNoKeysFound) + + // Put in a few names and ages. + put("name", "derek") + put("age", "22") + put("country", "US") + put("name", "ivan") + put("age", "33") + put("country", "US") + put("name", "rip") + put("age", "44") + put("country", "MT") + + keys, err := kv.Keys() + expectOk(t, err) + + kmap := make(map[string]struct{}) + for _, key := range keys { + if _, ok := kmap[key]; ok { + t.Fatalf("Already saw %q", key) + } + kmap[key] = struct{}{} + } + if len(kmap) != 3 { + t.Fatalf("Expected 3 total keys, got %d", len(kmap)) + } + expected := map[string]struct{}{ + "name": struct{}{}, + "age": struct{}{}, + "country": struct{}{}, + } + if !reflect.DeepEqual(kmap, expected) { + t.Fatalf("Expected %+v but got %+v", expected, kmap) + } + // Make sure delete and purge do the right thing and not return the keys. + err = kv.Delete("name") + expectOk(t, err) + err = kv.Purge("country") + expectOk(t, err) + + keys, err = kv.Keys() + expectOk(t, err) + + kmap = make(map[string]struct{}) + for _, key := range keys { + if _, ok := kmap[key]; ok { + t.Fatalf("Already saw %q", key) + } + kmap[key] = struct{}{} + } + if len(kmap) != 1 { + t.Fatalf("Expected 1 total key, got %d", len(kmap)) + } + if _, ok := kmap["age"]; !ok { + t.Fatalf("Expected %q to be only key present", "age") + } +} + +// Helpers + +func client(t *testing.T, s *server.Server) *nats.Conn { + t.Helper() + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + return nc +} + +func jsClient(t *testing.T, s *server.Server) (*nats.Conn, nats.JetStreamContext) { + t.Helper() + nc := client(t, s) + js, err := nc.JetStream(nats.MaxWait(10 * time.Second)) + if err != nil { + t.Fatalf("Unexpected error getting JetStream context: %v", err) + } + return nc, js +} + +func shutdown(s *server.Server) { + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + s.Shutdown() +} + +func expectOk(t *testing.T, err error) { + t.Helper() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } +} + +func expectErr(t *testing.T, err error, expected ...error) { + t.Helper() + if err == nil { + t.Fatalf("Expected error but got none") + } + if len(expected) == 0 { + return + } + for _, e := range expected { + if err == e || strings.Contains(e.Error(), err.Error()) { + return + } + } + t.Fatalf("Expected one of %+v, got '%v'", expected, err) +} diff --git a/test/norace_test.go b/test/norace_test.go new file mode 100644 index 000000000..5b82712ed --- /dev/null +++ b/test/norace_test.go @@ -0,0 +1,81 @@ +// Copyright 2019-2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !race + +package test + +import ( + "context" + "crypto/rand" + "io" + "testing" + "time" + + "github.com/nats-io/nats.go" +) + +func TestNoRaceObjectContextOpt(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"}) + expectOk(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + time.AfterFunc(100*time.Millisecond, cancel) + + start := time.Now() + _, err = obs.Put(&nats.ObjectMeta{Name: "TEST"}, &slow{1000}, nats.Context(ctx)) + expectErr(t, err) + if delta := time.Since(start); delta > time.Second { + t.Fatalf("Cancel took too long: %v", delta) + } + si, err := js.StreamInfo("OBJ_OBJS") + expectOk(t, err) + if si.State.Msgs != 0 { + t.Fatalf("Expected no messages after canceling put, got %+v", si.State) + } + + // Now put a large object in there. + blob := make([]byte, 8*1024*1024) + rand.Read(blob) + _, err = obs.PutBytes("BLOB", blob) + expectOk(t, err) + + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) + time.AfterFunc(100*time.Millisecond, cancel) + + time.AfterFunc(20*time.Millisecond, func() { shutdown(s) }) + start = time.Now() + _, err = obs.GetBytes("BLOB", nats.Context(ctx)) + expectErr(t, err) + if delta := time.Since(start); delta > time.Second { + t.Fatalf("Cancel took too long: %v", delta) + } +} + +type slow struct{ n int } + +func (sr *slow) Read(p []byte) (n int, err error) { + if sr.n <= 0 { + return 0, io.EOF + } + sr.n-- + time.Sleep(10 * time.Millisecond) + p[0] = 'A' + return 1, nil +} diff --git a/test/object_test.go b/test/object_test.go new file mode 100644 index 000000000..c1c70684e --- /dev/null +++ b/test/object_test.go @@ -0,0 +1,539 @@ +// Copyright 2021 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "bytes" + "crypto/rand" + "io/ioutil" + "os" + "path" + "path/filepath" + "reflect" + "testing" + "time" + + "github.com/nats-io/nats.go" +) + +func TestObjectBasics(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"}) + expectOk(t, err) + + // Create ~16MB object. + blob := make([]byte, 16*1024*1024+22) + rand.Read(blob) + + now := time.Now().UTC().Round(time.Second) + _, err = obs.PutBytes("BLOB", blob) + expectOk(t, err) + + // Test info + info, err := obs.GetInfo("BLOB") + expectOk(t, err) + if len(info.NUID) == 0 { + t.Fatalf("Expected object to have a NUID") + } + if info.ModTime.IsZero() { + t.Fatalf("Expected object to have a non-zero ModTime") + } + if mt := info.ModTime.Round(time.Second); mt.Sub(now) != 0 && mt.Sub(now) != time.Second { + t.Fatalf("Expected ModTime to be about %v, got %v", now, mt) + } + + // Make sure the stream is sealed. + err = obs.Seal() + expectOk(t, err) + si, err := js.StreamInfo("OBJ_OBJS") + expectOk(t, err) + if !si.Config.Sealed { + t.Fatalf("Expected the object stream to be sealed, got %+v", si) + } + + // Check simple errors. + _, err = obs.Get("FOO") + expectErr(t, err) + + // Now get the object back. + result, err := obs.Get("BLOB") + expectOk(t, err) + expectOk(t, result.Error()) + defer result.Close() + + // Check info. + info, err = result.Info() + expectOk(t, err) + if info.Size != uint64(len(blob)) { + t.Fatalf("Size does not match, %d vs %d", info.Size, len(blob)) + } + + // Check result. + copy, err := ioutil.ReadAll(result) + expectOk(t, err) + if !bytes.Equal(copy, blob) { + t.Fatalf("Result not the same") + } + // Test delete. + err = js.DeleteObjectStore("OBJS") + expectOk(t, err) + _, err = obs.Get("BLOB") + expectErr(t, err, nats.ErrStreamNotFound) +} + +func TestObjectFileBasics(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "FILES"}) + expectOk(t, err) + + // Create ~8MB object. + blob := make([]byte, 8*1024*1024+33) + rand.Read(blob) + + tmpFile, err := ioutil.TempFile("", "objfile") + expectOk(t, err) + defer os.Remove(tmpFile.Name()) // clean up + err = ioutil.WriteFile(tmpFile.Name(), blob, 0600) + expectOk(t, err) + + _, err = obs.PutFile(tmpFile.Name()) + expectOk(t, err) + + tmpResult, err := ioutil.TempFile("", "objfileresult") + expectOk(t, err) + defer os.Remove(tmpResult.Name()) // clean up + + err = obs.GetFile(tmpFile.Name(), tmpResult.Name()) + expectOk(t, err) + + // Make sure they are the same. + original, err := ioutil.ReadFile(tmpFile.Name()) + expectOk(t, err) + + restored, err := ioutil.ReadFile(tmpResult.Name()) + expectOk(t, err) + + if !bytes.Equal(original, restored) { + t.Fatalf("Files did not match") + } +} + +func TestObjectMulti(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "TEST_FILES"}) + expectOk(t, err) + + numFiles := 0 + fis, _ := ioutil.ReadDir(".") + for _, fi := range fis { + fn := fi.Name() + // Just grab clean test files. + if filepath.Ext(fn) != ".go" || fn[0] == '.' || fn[0] == '#' { + continue + } + _, err = obs.PutFile(fn) + expectOk(t, err) + numFiles++ + } + expectOk(t, obs.Seal()) + + _, err = js.StreamInfo("OBJ_TEST_FILES") + expectOk(t, err) + + result, err := obs.Get("object_test.go") + expectOk(t, err) + expectOk(t, result.Error()) + defer result.Close() + + _, err = result.Info() + expectOk(t, err) + + copy, err := ioutil.ReadAll(result) + expectOk(t, err) + + orig, err := ioutil.ReadFile(path.Join(".", "object_test.go")) + expectOk(t, err) + + if !bytes.Equal(orig, copy) { + t.Fatalf("Files did not match") + } +} + +func TestObjectDeleteMarkers(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"}) + expectOk(t, err) + + msg := bytes.Repeat([]byte("A"), 100) + _, err = obs.PutBytes("A", msg) + expectOk(t, err) + + err = obs.Delete("A") + expectOk(t, err) + + si, err := js.StreamInfo("OBJ_OBJS") + expectOk(t, err) + + // We should have one message left. The delete marker. + if si.State.Msgs != 1 { + t.Fatalf("Expected 1 marker msg, got %d msgs", si.State.Msgs) + } + // Make sure we have a delete marker, this will be there to drive Watch functionality. + info, err := obs.GetInfo("A") + expectOk(t, err) + if !info.Deleted { + t.Fatalf("Expected info to be marked as deleted") + } +} + +func TestObjectMultiWithDelete(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "2OD"}) + expectOk(t, err) + + pa := bytes.Repeat([]byte("A"), 2_000_000) + pb := bytes.Repeat([]byte("B"), 3_000_000) + + _, err = obs.PutBytes("A", pa) + expectOk(t, err) + + // Hold onto this so we can make sure DeleteObject clears all messages, chunks and meta. + si, err := js.StreamInfo("OBJ_2OD") + expectOk(t, err) + + _, err = obs.PutBytes("B", pb) + expectOk(t, err) + + pb2, err := obs.GetBytes("B") + expectOk(t, err) + + if !bytes.Equal(pb, pb2) { + t.Fatalf("Did not retrieve same object") + } + + // Now delete B + err = obs.Delete("B") + expectOk(t, err) + + siad, err := js.StreamInfo("OBJ_2OD") + expectOk(t, err) + if siad.State.Msgs != si.State.Msgs+1 { // +1 more delete marker. + t.Fatalf("Expected to have %d msgs after delete, got %d", siad.State.Msgs, si.State.Msgs+1) + } +} + +func TestObjectNames(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"}) + expectOk(t, err) + + // Test filename like naming. + _, err = obs.PutString("BLOB.txt", "A") + expectOk(t, err) + // Spaces ok + _, err = obs.PutString("foo bar", "A") + expectOk(t, err) + + // Errors + _, err = obs.PutString("*", "A") + expectErr(t, err) + _, err = obs.PutString(">", "A") + expectErr(t, err) + _, err = obs.PutString("", "A") + expectErr(t, err) + _, err = obs.PutString("", "\t") + expectErr(t, err) + _, err = obs.PutString("", "\n") + expectErr(t, err) +} + +func TestObjectMetadata(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "META-TEST"}) + expectOk(t, err) + + // Simple with no Meta. + _, err = obs.PutString("A", "AAA") + expectOk(t, err) + + meta := &nats.ObjectMeta{Name: "B"} + + err = obs.UpdateMeta("A", meta) + expectOk(t, err) + + info, err := obs.GetInfo("B") + expectOk(t, err) + if info.Name != "B" { + t.Fatalf("Update failed: %+v", info) + } + meta.Headers = make(nats.Header) + meta.Headers.Set("color", "red") + + err = obs.UpdateMeta("B", meta) + expectOk(t, err) + + info, err = obs.GetInfo("B") + expectOk(t, err) + if info.Headers == nil || info.Headers.Get("color") != "red" { + t.Fatalf("Update failed: %+v", info) + } +} + +func TestObjectWatch(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "WATCH-TEST"}) + expectOk(t, err) + + watcher, err := obs.Watch() + expectOk(t, err) + defer watcher.Stop() + + expectUpdate := func(name string) { + t.Helper() + select { + case info := <-watcher.Updates(): + if false && info.Name != name { + t.Fatalf("Expected update for %q, but got %+v", name, info) + } + case <-time.After(time.Second): + t.Fatalf("Did not receive an update like expected") + } + } + + expectNoMoreUpdates := func() { + t.Helper() + select { + case info := <-watcher.Updates(): + t.Fatalf("Got an unexpected update: %+v", info) + case <-time.After(100 * time.Millisecond): + } + } + + expectInitDone := func() { + t.Helper() + select { + case info := <-watcher.Updates(): + if info != nil { + t.Fatalf("Did not get expected: %+v", info) + } + case <-time.After(time.Second): + t.Fatalf("Did not receive a init done like expected") + } + } + + // We should get a marker that is nil when all initital values are delivered. + expectInitDone() + + _, err = obs.PutString("A", "AAA") + expectOk(t, err) + _, err = obs.PutString("B", "BBB") + expectOk(t, err) + + // Initial Values. + expectUpdate("A") + expectUpdate("B") + expectNoMoreUpdates() + + // Delete + err = obs.Delete("A") + expectOk(t, err) + + expectUpdate("A") + expectNoMoreUpdates() + + // New + _, err = obs.PutString("C", "CCC") + expectOk(t, err) + + // Update Meta + info, err := obs.GetInfo("A") + expectOk(t, err) + meta := &info.ObjectMeta + meta.Description = "A's are better than B's" + err = obs.UpdateMeta("A", meta) + expectOk(t, err) +} + +func TestObjectLinks(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + root, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "ROOT"}) + expectOk(t, err) + + _, err = root.PutString("A", "AAA") + expectOk(t, err) + _, err = root.PutString("B", "BBB") + expectOk(t, err) + + info, err := root.GetInfo("A") + expectOk(t, err) + + // Self link to individual object. + _, err = root.AddLink("LA", info) + expectOk(t, err) + + dir, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "DIR"}) + expectOk(t, err) + + _, err = dir.PutString("DIR/A", "DIR-AAA") + expectOk(t, err) + _, err = dir.PutString("DIR/B", "DIR-BBB") + expectOk(t, err) + + info, err = dir.GetInfo("DIR/B") + expectOk(t, err) + + binfo, err := root.AddLink("DBL", info) + expectOk(t, err) + + if binfo.Name != "DBL" || binfo.NUID == "" || binfo.ModTime.IsZero() { + t.Fatalf("Link info not what was expected: %+v", binfo) + } + + // Now add whole other store as a link, like a directory. + _, err = root.AddBucketLink("dir", dir) + expectOk(t, err) + + // Now try to get a linked object. + dbl, err := root.GetString("DBL") + expectOk(t, err) + + if dbl != "DIR-BBB" { + t.Fatalf("Expected %q but got %q", "DIR-BBB", dbl) + } +} + +// Right now no history, just make sure we are cleaning up after ourselves. +func TestObjectHistory(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"}) + expectOk(t, err) + + _, err = obs.PutBytes("A", bytes.Repeat([]byte("A"), 100)) + expectOk(t, err) + + _, err = obs.PutBytes("A", bytes.Repeat([]byte("a"), 100)) + expectOk(t, err) + + // Should only be 1 copy of 'A', so 1 data and 1 meta since history was not selected. + si, err := js.StreamInfo("OBJ_OBJS") + expectOk(t, err) + + if si.State.Msgs != 2 { + t.Fatalf("Expected 2 msgs (1 data 1 meta) but got %d", si.State.Msgs) + } +} + +func TestObjectList(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + root, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "ROOT"}) + expectOk(t, err) + + put := func(name, value string) { + _, err = root.PutString(name, value) + expectOk(t, err) + } + + put("A", "AAA") + put("B", "BBB") + put("C", "CCC") + put("B", "bbb") + + // Self link + info, err := root.GetInfo("B") + expectOk(t, err) + _, err = root.AddLink("b", info) + expectOk(t, err) + + put("D", "DDD") + err = root.Delete("D") + expectOk(t, err) + + lch, err := root.List() + expectOk(t, err) + + omap := make(map[string]struct{}) + for _, info := range lch { + if _, ok := omap[info.Name]; ok { + t.Fatalf("Already saw %q", info.Name) + } + omap[info.Name] = struct{}{} + } + if len(omap) != 4 { + t.Fatalf("Expected 4 total objects, got %d", len(omap)) + } + expected := map[string]struct{}{ + "A": struct{}{}, + "B": struct{}{}, + "C": struct{}{}, + "b": struct{}{}, + } + if !reflect.DeepEqual(omap, expected) { + t.Fatalf("Expected %+v but got %+v", expected, omap) + } +}