diff --git a/js.go b/js.go index 1cc2ae863..81f9db5b0 100644 --- a/js.go +++ b/js.go @@ -30,78 +30,6 @@ import ( "github.com/nats-io/nuid" ) -// Request API subjects for JetStream. -const ( - // defaultAPIPrefix is the default prefix for the JetStream API. - defaultAPIPrefix = "$JS.API." - - // jsDomainT is used to create JetStream API prefix by specifying only Domain - jsDomainT = "$JS.%s.API." - - // apiAccountInfo is for obtaining general information about JetStream. - apiAccountInfo = "INFO" - - // apiConsumerCreateT is used to create consumers. - apiConsumerCreateT = "CONSUMER.CREATE.%s" - - // apiDurableCreateT is used to create durable consumers. - apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s" - - // apiConsumerInfoT is used to create consumers. - apiConsumerInfoT = "CONSUMER.INFO.%s.%s" - - // apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode. - apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s" - - // apiDeleteConsumerT is used to delete consumers. - apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s" - - // apiConsumerListT is used to return all detailed consumer information - apiConsumerListT = "CONSUMER.LIST.%s" - - // apiConsumerNamesT is used to return a list with all consumer names for the stream. - apiConsumerNamesT = "CONSUMER.NAMES.%s" - - // apiStreams can lookup a stream by subject. - apiStreams = "STREAM.NAMES" - - // apiStreamCreateT is the endpoint to create new streams. - apiStreamCreateT = "STREAM.CREATE.%s" - - // apiStreamInfoT is the endpoint to get information on a stream. - apiStreamInfoT = "STREAM.INFO.%s" - - // apiStreamUpdate is the endpoint to update existing streams. - apiStreamUpdateT = "STREAM.UPDATE.%s" - - // apiStreamDeleteT is the endpoint to delete streams. - apiStreamDeleteT = "STREAM.DELETE.%s" - - // apiPurgeStreamT is the endpoint to purge streams. - apiStreamPurgeT = "STREAM.PURGE.%s" - - // apiStreamListT is the endpoint that will return all detailed stream information - apiStreamList = "STREAM.LIST" - - // apiMsgGetT is the endpoint to get a message. - apiMsgGetT = "STREAM.MSG.GET.%s" - - // apiMsgDeleteT is the endpoint to remove a message. - apiMsgDeleteT = "STREAM.MSG.DELETE.%s" - - // orderedHeartbeatsInterval is how fast we want HBs from the server during idle. - orderedHeartbeatsInterval = 5 * time.Second - - // Scale for threshold of missed HBs or lack of activity. - hbcThresh = 2 -) - -// Types of control messages, so far heartbeat and flow control -const ( - jsCtrlHB = 1 - jsCtrlFC = 2 -) - // JetStream allows persistent messaging through JetStream. type JetStream interface { // Publish publishes a message to JetStream. @@ -177,9 +105,82 @@ type JetStream interface { type JetStreamContext interface { JetStream JetStreamManager - ObjectStore + KeyValueManager + ObjectStoreManager } +// Request API subjects for JetStream. +const ( + // defaultAPIPrefix is the default prefix for the JetStream API. + defaultAPIPrefix = "$JS.API." + + // jsDomainT is used to create JetStream API prefix by specifying only Domain + jsDomainT = "$JS.%s.API." + + // apiAccountInfo is for obtaining general information about JetStream. + apiAccountInfo = "INFO" + + // apiConsumerCreateT is used to create consumers. + apiConsumerCreateT = "CONSUMER.CREATE.%s" + + // apiDurableCreateT is used to create durable consumers. + apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s" + + // apiConsumerInfoT is used to create consumers. + apiConsumerInfoT = "CONSUMER.INFO.%s.%s" + + // apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode. + apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s" + + // apiDeleteConsumerT is used to delete consumers. + apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s" + + // apiConsumerListT is used to return all detailed consumer information + apiConsumerListT = "CONSUMER.LIST.%s" + + // apiConsumerNamesT is used to return a list with all consumer names for the stream. + apiConsumerNamesT = "CONSUMER.NAMES.%s" + + // apiStreams can lookup a stream by subject. + apiStreams = "STREAM.NAMES" + + // apiStreamCreateT is the endpoint to create new streams. + apiStreamCreateT = "STREAM.CREATE.%s" + + // apiStreamInfoT is the endpoint to get information on a stream. + apiStreamInfoT = "STREAM.INFO.%s" + + // apiStreamUpdate is the endpoint to update existing streams. + apiStreamUpdateT = "STREAM.UPDATE.%s" + + // apiStreamDeleteT is the endpoint to delete streams. + apiStreamDeleteT = "STREAM.DELETE.%s" + + // apiPurgeStreamT is the endpoint to purge streams. + apiStreamPurgeT = "STREAM.PURGE.%s" + + // apiStreamListT is the endpoint that will return all detailed stream information + apiStreamList = "STREAM.LIST" + + // apiMsgGetT is the endpoint to get a message. + apiMsgGetT = "STREAM.MSG.GET.%s" + + // apiMsgDeleteT is the endpoint to remove a message. + apiMsgDeleteT = "STREAM.MSG.DELETE.%s" + + // orderedHeartbeatsInterval is how fast we want HBs from the server during idle. + orderedHeartbeatsInterval = 5 * time.Second + + // Scale for threshold of missed HBs or lack of activity. + hbcThresh = 2 +) + +// Types of control messages, so far heartbeat and flow control +const ( + jsCtrlHB = 1 + jsCtrlFC = 2 +) + // js is an internal struct from a JetStreamContext. type js struct { nc *Conn @@ -312,6 +313,13 @@ const ( ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence" ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence" ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id" + MsgRollup = "Nats-Rollup" +) + +// Rollups, can be subject only or all messages. +const ( + MsgRollupSubject = "sub" + MsgRollupAll = "all" ) // PublishMsg publishes a Msg to a stream from JetStream. diff --git a/kv.go b/kv.go index 63b2f026f..5ca303583 100644 --- a/kv.go +++ b/kv.go @@ -15,7 +15,6 @@ package nats import ( "context" - "encoding/json" "errors" "fmt" "strconv" @@ -23,6 +22,15 @@ import ( "time" ) +type KeyValueManager interface { + // KeyValue will lookup and bind to an existing KeyValue store. + KeyValue(bucket string) (KeyValue, error) + // CreateKeyValue will create a KeyValue store with the following configuration. + CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) + // DeleteKeyValue will delete this KeyValue store (JetStream stream). + DeleteKeyValue(bucket string) error +} + type KeyValue interface { // Get returns the latest value for the key. Get(key string) (entry KeyValueEntry, err error) @@ -107,19 +115,15 @@ const ( ) // KeyValue will lookup and bind to an existing KeyValue store. -func (nc *Conn) KeyValue(bucket string, opts ...JSOpt) (KeyValue, error) { +func (js *js) KeyValue(bucket string) (KeyValue, error) { if bucket == _EMPTY_ { return nil, ErrBucketNameRequired } if strings.Contains(bucket, ".") { return nil, ErrInvalidBucketName } - jsc, err := nc.JetStream(opts...) - if err != nil { - return nil, err - } stream := fmt.Sprintf(kvBucketNameTmpl, bucket) - si, err := jsc.StreamInfo(stream) + si, err := js.StreamInfo(stream) if err != nil { if err == ErrStreamNotFound { err = ErrBucketNotFound @@ -136,26 +140,20 @@ func (nc *Conn) KeyValue(bucket string, opts ...JSOpt) (KeyValue, error) { name: bucket, stream: stream, pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket), - nc: nc, - js: jsc.(*js), + js: js, } return kv, nil } -// AddKeyValue will create a KeyValue store with the following configuration. -func (nc *Conn) AddKeyValue(cfg *KeyValueConfig, opts ...JSOpt) (KeyValue, error) { +// CreateKeyValue will create a KeyValue store with the following configuration. +func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { if cfg == nil || cfg.Bucket == _EMPTY_ { return nil, ErrBucketNameRequired } if strings.Contains(cfg.Bucket, ".") { return nil, ErrInvalidBucketName } - - jsc, err := nc.JetStream(opts...) - if err != nil { - return nil, err - } - if _, err = jsc.AccountInfo(); err != nil { + if _, err := js.AccountInfo(); err != nil { return nil, err } @@ -181,7 +179,7 @@ func (nc *Conn) AddKeyValue(cfg *KeyValueConfig, opts ...JSOpt) (KeyValue, error Replicas: replicas, } - if _, err := jsc.AddStream(scfg); err != nil { + if _, err := js.AddStream(scfg); err != nil { return nil, err } @@ -189,27 +187,21 @@ func (nc *Conn) AddKeyValue(cfg *KeyValueConfig, opts ...JSOpt) (KeyValue, error name: cfg.Bucket, stream: scfg.Name, pre: fmt.Sprintf(kvSubjectsPreTmpl, cfg.Bucket), - nc: nc, - js: jsc.(*js), + js: js, } return kv, nil } // DeleteKeyValue will delete this KeyValue store (JetStream stream). -func (nc *Conn) DeleteKeyValue(bucket string, opts ...JSOpt) error { - js, err := nc.JetStream(opts...) - if err != nil { - return err - } +func (js *js) DeleteKeyValue(bucket string) error { stream := fmt.Sprintf(kvBucketNameTmpl, bucket) - return js.DeleteStream(stream, opts...) + return js.DeleteStream(stream) } type kvs struct { name string stream string pre string - nc *Conn js *js } @@ -316,14 +308,6 @@ type purgeRequest struct { // Delete the key and all revisions. func (kv *kvs) Delete(key string) error { - o, cancel, err := getJSContextOpts(kv.js.opts) - if err != nil { - return err - } - if cancel != nil { - defer cancel() - } - var b strings.Builder b.WriteString(kv.pre) b.WriteString(key) @@ -335,25 +319,11 @@ func (kv *kvs) Delete(key string) error { if err != nil { return err } - preq, err := json.Marshal(&purgeRequest{Subject: b.String(), Keep: 1}) + err = kv.js.purgeStream(kv.stream, &streamPurgeRequest{Subject: b.String(), Keep: 1}) if err != nil { return err } - // Send request. - subj := kv.js.apiSubj(fmt.Sprintf(apiStreamPurgeT, kv.stream)) - r, err := kv.nc.RequestWithContext(o.ctx, subj, preq) - if err != nil { - return err - } - var resp streamPurgeResponse - if err := json.Unmarshal(r.Data, &resp); err != nil { - return err - } - if resp.Error != nil { - return errors.New(resp.Error.Description) - } - // Double check the pubAck future. select { case <-paf.Ok(): diff --git a/object.go b/object.go index 80bd7c69f..29d4f24e7 100644 --- a/object.go +++ b/object.go @@ -21,63 +21,120 @@ import ( "errors" "fmt" "io" - "math/rand" "net" "os" "strings" "sync" "time" + + "github.com/nats-io/nuid" ) -type ObjectStore interface { +type ObjectStoreManager interface { + // ObjectStore will lookup and bind to an existing object store instance. + ObjectStore(bucket string) (ObjectStore, error) // CreateObjectStore will create an object store. - CreateObjectStore(cfg *ObjectConfig) error - // SealObjectStore will seal the underlying stream. - SealObjectStore(store string) error + CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) // DeleteObjectStore will delete the underlying stream for the named object. - DeleteObjectStore(store string) error + DeleteObjectStore(bucket string) error +} + +type ObjectStore interface { + // Put will place the contents from the reader into a new object. + Put(obj *ObjectMeta, reader io.Reader) (*ObjectInfo, error) + // Get will pull the named object from the object store. + Get(name string) (ObjectResult, error) + + // Convenience functions for low level Put and Get. + + // PutBytes is convenience function to put a byte slice into this object store. + PutBytes(name string, data []byte) (*ObjectInfo, error) + // GetBytes is a convenience function to pull an object from this object store and return it as a byte slice. + GetBytes(name string) ([]byte, error) + + // PutBytes is convenience function to put a string into this object store. + PutString(name string, data string) (*ObjectInfo, error) + // GetString is a convenience function to pull an object from this object store and return it as a string. + GetString(name string) (string, error) + + // PutFile is convenience function to put a file into this object store. + PutFile(file string) (*ObjectInfo, error) + // GetFile is a convenience function to pull an object from this object store and place it in a file. + GetFile(name, file string) error + + // GetInfo will retrieve the current information for the object. + GetInfo(name string) (*ObjectInfo, error) + // UpdateMeta will update the meta data for the object. + UpdateMeta(name string, meta *ObjectMeta) error + + // Delete will delete the named object. + Delete(name string) error - // PutObject will place the contents from the reader into a new object store. - PutObject(store string, meta *ObjectMeta, reader io.Reader) error - // GetObject will pull the object from the object store. - GetObject(store, name string) (ObjectResult, error) - // DeleteObject will delete the named object from a multi-use store. - DeleteObject(store, name string) error + // AddLink will add a link to another object into this object store. + AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) - // PutFile is convenience function to put a file into a new object store. - PutFile(store string, filename string) error - // GetFile is a convenience function to pull an object from a store and place it in a file. - GetFile(store, name, outfile string) error + // AddBucket will add a link to another object store. + AddBucket(name string, bucket ObjectStore) (*ObjectInfo, error) + + // Seal will seal the object store, no further modifications will be allowed. + Seal() error + + // Watch for changes in the underlying store and receive meta information updates. + Watch(cb ObjectStoreUpdate) (*Subscription, error) } +type ObjectStoreUpdate func(meta *ObjectInfo) + var ( - errBadMeta = errors.New("nats: object stream meta information invalid") - errBadObjectName = errors.New("nats: invalid object name") + errBadMeta = errors.New("nats: object stream meta information invalid") + errBadObjectName = errors.New("nats: invalid object name") + errDigestMismatch = errors.New("nats: received corrupt object, digests do not match") ) +// ObjectStoreConfig is the config for the object store. +type ObjectStoreConfig struct { + Bucket string + Description string + TTL time.Duration + Storage StorageType + Replicas int +} + +// ObjectMetaOptions +type ObjectMetaOptions struct { + Link *ObjectLink `json:"link,omitempty"` + ChunkSize uint32 `json:"max_chunk_size,omitempty"` +} + // ObjectMeta is high level information about an object. type ObjectMeta struct { Name string `json:"name"` Description string `json:"description,omitempty"` Headers Header `json:"headers,omitempty"` - ChunkSize uint32 `json:"chunk_size,omitempty"` -} -// Config for the object store. -type ObjectConfig struct { - Name string `json:"name"` - Description string `json:"description,omitempty"` - Headers Header `json:"headers,omitempty"` - TTL time.Duration `json:"ttl,omitempty"` - Storage StorageType `json:"storage"` - Replicas int `json:"replicas"` + // Optional options. + Opts *ObjectMetaOptions `json:"options,omitempty"` } +// ObjectInfo is meta plus instance information. type ObjectInfo struct { ObjectMeta - Size uint64 `json:"size"` - Chunks uint32 `json:"chunks"` - Digest string `json:"digest,omitempty"` + Bucket string `json:"bucket"` + NUID string `json:"nuid"` + Size uint64 `json:"size"` + ModTime time.Time `json:"mtime"` + Chunks uint32 `json:"chunks"` + Digest string `json:"digest,omitempty"` + Deleted bool `json:"deleted,omitempty"` +} + +// ObjectLink is used to embed links to other buckets and objects. +type ObjectLink struct { + // Bucket is the name of the other object store. + Bucket string `json:"bucket"` + // Name can be used to link to a single object. + // If empty means this is a link to the whole store, like a directory. + Name string `json:"name,omitempty"` } // ObjectResult will return the underlying stream info and also be an io.ReadCloser. @@ -90,30 +147,29 @@ type ObjectResult interface { const ( objNameTmpl = "OBJ_%s" objSubjectsPre = "$O." - objAllChunksPreTmpl = "$O.%s.DATA.>" - objAllMetaPreTmpl = "$O.%s.META.>" - objChunksPreTmpl = "$O.%s.DATA.%s" - objMetaPreTmpl = "$O.%s.META.%s" + objAllChunksPreTmpl = "$O.%s.C.>" + objAllMetaPreTmpl = "$O.%s.M.>" + objChunksPreTmpl = "$O.%s.C.%s" + objMetaPreTmpl = "$O.%s.M.%s" objNoPending = "0" - objDefaultChunkSize = 128 * 1024 // 128k + objDefaultChunkSize = uint32(128 * 1024) // 128k objDigestType = "sha-256=" objDigestTmpl = objDigestType + "%s" - objChunkTokenHdr = "Chunk-Token" ) +type obs struct { + name string + stream string + js *js +} + // CreateObjectStore will create an object store. -func (js *js) CreateObjectStore(cfg *ObjectConfig) error { - if cfg == nil || cfg.Name == _EMPTY_ { - return ErrStreamNameRequired - } - name := sanitizeName(cfg.Name) - if !nameOk(name) { - return ErrInvalidStreamName - } - if cfg.Replicas == 0 { - cfg.Replicas = 1 +func (js *js) CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) { + if cfg == nil || cfg.Bucket == _EMPTY_ { + return nil, ErrStreamNameRequired } + name := cfg.Bucket chunks := fmt.Sprintf(objAllChunksPreTmpl, name) meta := fmt.Sprintf(objAllMetaPreTmpl, name) @@ -129,25 +185,26 @@ func (js *js) CreateObjectStore(cfg *ObjectConfig) error { // Create our stream. _, err := js.AddStream(scfg) - return err + if err != nil { + return nil, err + } + + return &obs{name: name, stream: scfg.Name, js: js}, nil } -func (js *js) SealObjectStore(store string) error { - stream := fmt.Sprintf(objNameTmpl, store) +// ObjectStore will lookup and bind to an existing object store instance. +func (js *js) ObjectStore(bucket string) (ObjectStore, error) { + stream := fmt.Sprintf(objNameTmpl, bucket) si, err := js.StreamInfo(stream) if err != nil { - return err + return nil, err } - // Seal the stream from being able to take on more messages. - cfg := si.Config - cfg.Sealed = true - _, err = js.UpdateStream(&cfg) - return err + return &obs{name: bucket, stream: si.Config.Name, js: js}, nil } // DeleteObjectStore will delete the underlying stream for the named object. -func (js *js) DeleteObjectStore(store string) error { - stream := fmt.Sprintf(objNameTmpl, store) +func (js *js) DeleteObjectStore(bucket string) error { + stream := fmt.Sprintf(objNameTmpl, bucket) return js.DeleteStream(stream) } @@ -170,36 +227,20 @@ func sanitizeName(name string) string { return strings.ReplaceAll(stream, " ", "_") } -const ( - alpha = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" - chunkTokenLen = 8 -) - -func generateChunkToken() string { - var b [chunkTokenLen]byte - rn := rand.Uint64() - for i, l := 0, rn; i < len(b); i++ { - b[i] = alpha[l%base] - l /= base - } - return string(b[:]) -} - // PutObject will place the contents from the reader into a new stream. -func (ojs *js) PutObject(store string, meta *ObjectMeta, r io.Reader) error { +func (obs *obs) Put(meta *ObjectMeta, r io.Reader) (*ObjectInfo, error) { if meta == nil { - return errors.New("nats: object meta required") + return nil, errBadMeta } - name := sanitizeName(meta.Name) - if !nameOk(name) { - return errBadObjectName + obj := sanitizeName(meta.Name) + if !nameOk(obj) { + return nil, errBadObjectName } // Create a random subject prefixed with the object stream name. - - chunkToken := generateChunkToken() - chunkSubj := fmt.Sprintf(objChunksPreTmpl, store, chunkToken) - metaSubj := fmt.Sprintf(objMetaPreTmpl, store, name) + id := nuid.Next() + chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, id) + metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, obj) // For async error handling var perr error @@ -215,26 +256,24 @@ func (ojs *js) PutObject(store string, meta *ObjectMeta, r io.Reader) error { return perr } - purgePartial := func() { - stream := fmt.Sprintf(objNameTmpl, store) - ojs.purgeStream(stream, &streamPurgeRequest{Subject: chunkSubj}) - } + purgePartial := func() { obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj}) } // Create our own JS context to handle errors etc. - js, err := ojs.nc.JetStream(PublishAsyncErrHandler(func(js JetStream, _ *Msg, err error) { setErr(err) })) + js, err := obs.js.nc.JetStream(PublishAsyncErrHandler(func(js JetStream, _ *Msg, err error) { setErr(err) })) if err != nil { - return err + return nil, err } - if meta.ChunkSize == 0 { - meta.ChunkSize = objDefaultChunkSize + chunkSize := objDefaultChunkSize + if meta.Opts != nil && meta.Opts.ChunkSize > 0 { + chunkSize = meta.Opts.ChunkSize } m, h := NewMsg(chunkSubj), sha256.New() - chunk, sent, total := make([]byte, meta.ChunkSize), 0, uint64(0) - info := &ObjectInfo{ObjectMeta: *meta} + chunk, sent, total := make([]byte, chunkSize), 0, uint64(0) + info := &ObjectInfo{Bucket: obs.name, NUID: id, ObjectMeta: *meta} - for { + for r != nil { n, err := r.Read(chunk) // EOF Processing. @@ -244,22 +283,10 @@ func (ojs *js) PutObject(store string, meta *ObjectMeta, r io.Reader) error { // Place meta info. info.Size, info.Chunks = uint64(total), uint32(sent) info.Digest = fmt.Sprintf(objDigestTmpl, base64.URLEncoding.EncodeToString(sha[:])) - mm := NewMsg(metaSubj) - mm.Header.Set(objChunkTokenHdr, chunkToken) - mm.Data, err = json.Marshal(info) - if err != nil { - purgePartial() - return err - } - _, err = js.PublishMsgAsync(mm) - if err != nil { - purgePartial() - return err - } break } else if err != nil { purgePartial() - return err + return nil, err } // Chunk processing. @@ -269,28 +296,45 @@ func (ojs *js) PutObject(store string, meta *ObjectMeta, r io.Reader) error { // Send msg itself. if _, err := js.PublishMsgAsync(m); err != nil { purgePartial() - return err + return nil, err } if err := getErr(); err != nil { purgePartial() - return err + return nil, err } // Update totals. sent++ total += uint64(n) } + // Publish the metadata. + mm := NewMsg(metaSubj) + mm.Data, err = json.Marshal(info) + if err != nil { + if r != nil { + purgePartial() + } + return nil, err + } + _, err = js.PublishMsgAsync(mm) + if err != nil { + if r != nil { + purgePartial() + } + return nil, err + } + // Wait for all to be processed. select { case <-js.PublishAsyncComplete(): if err := getErr(); err != nil { - return err + return nil, err } - case <-time.After(ojs.opts.wait): - return ErrTimeout + case <-time.After(obs.js.opts.wait): + return nil, ErrTimeout } - - return nil + info.ModTime = time.Now().UTC() + return info, nil } // ObjectResult impl. @@ -301,34 +345,40 @@ type objResult struct { err error } -// GetObject will pull the object from the underlying stream. -func (js *js) GetObject(store, name string) (ObjectResult, error) { - // Lookup the stream to get the bound subject. - name = sanitizeName(name) - if !nameOk(name) { - return nil, errBadObjectName - } - - // Grab last meta value we have. - meta := fmt.Sprintf(objMetaPreTmpl, store, name) - stream := fmt.Sprintf(objNameTmpl, store) +func (info *ObjectInfo) isLink() bool { + return info.ObjectMeta.Opts != nil && info.ObjectMeta.Opts.Link != nil +} - m, err := js.GetLastMsg(stream, meta) +// GetObject will pull the object from the underlying stream. +func (obs *obs) Get(name string) (ObjectResult, error) { + // Grab meta info. + info, err := obs.GetInfo(name) if err != nil { return nil, err } - - chunkToken := m.Header.Get(objChunkTokenHdr) - if chunkToken == _EMPTY_ { + if info.NUID == _EMPTY_ { return nil, errBadMeta } - var info ObjectInfo - if err := json.Unmarshal(m.Data, &info); err != nil { - return nil, errBadMeta + + // Check for object links.If single objects we do a pass through. + if info.isLink() { + if info.ObjectMeta.Opts.Link.Name == _EMPTY_ { + return nil, errors.New("nats: link is a bucket") + } + lobs, err := obs.js.ObjectStore(info.ObjectMeta.Opts.Link.Bucket) + if err != nil { + return nil, err + } + return lobs.Get(info.ObjectMeta.Opts.Link.Name) + } + + result := &objResult{info: info} + if info.Size == 0 { + return result, nil } pr, pw := net.Pipe() - result := &objResult{info: &info, r: pr} + result.r = pr gotErr := func(m *Msg, err error) { pw.Close() @@ -371,14 +421,14 @@ func (js *js) GetObject(store, name string) (ObjectResult, error) { return } if !bytes.Equal(sha[:], rsha) { - gotErr(m, errors.New("nats: received corrupt object, digests do not match")) + gotErr(m, errDigestMismatch) return } } } - chunkSubj := fmt.Sprintf(objChunksPreTmpl, store, chunkToken) - _, err = js.Subscribe(chunkSubj, processChunk, OrderedConsumer()) + chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID) + _, err = obs.js.Subscribe(chunkSubj, processChunk, OrderedConsumer()) if err != nil { return nil, err } @@ -386,63 +436,227 @@ func (js *js) GetObject(store, name string) (ObjectResult, error) { return result, nil } -func (js *js) DeleteObject(store, name string) error { - // Lookup the stream to get the bound subject. - name = sanitizeName(name) - if !nameOk(name) { - return errBadObjectName - } - stream := fmt.Sprintf(objNameTmpl, store) - - // Grab last meta value we have. - meta := fmt.Sprintf(objMetaPreTmpl, store, name) - m, err := js.GetLastMsg(stream, meta) +// Delete will delete the object. +func (obs *obs) Delete(name string) error { + // Grab meta info. + info, err := obs.GetInfo(name) if err != nil { return err } - chunkToken := m.Header.Get(objChunkTokenHdr) - if chunkToken == _EMPTY_ { + if info.NUID == _EMPTY_ { return errBadMeta } - chunkSubj := fmt.Sprintf(objChunksPreTmpl, store, chunkToken) - // Delete Meta - err = js.DeleteMsg(stream, m.Sequence) + + // Place a rollup delete marker. + info.Deleted = true + info.Size, info.Chunks, info.Digest = 0, 0, _EMPTY_ + + metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, sanitizeName(name)) + mm := NewMsg(metaSubj) + mm.Data, err = json.Marshal(info) + if err != nil { + return err + } + mm.Header.Set(MsgRollup, MsgRollupSubject) + _, err = obs.js.PublishMsg(mm) if err != nil { return err } + // Purge chunks for the object. - return js.purgeStream(stream, &streamPurgeRequest{Subject: chunkSubj}) + chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID) + return obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj}) +} + +// AddLink will add a link to another object into this object store. +func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) { + if obj == nil { + return nil, errors.New("nats: object required") + } + if obj.Deleted { + return nil, errors.New("nats: object is deleted") + } + // Same object store. + if obj.Bucket == obs.name { + info := *obj + info.Name = name + if err := obs.UpdateMeta(obj.Name, &info.ObjectMeta); err != nil { + return nil, err + } + return obs.GetInfo(name) + } + + link := &ObjectLink{Bucket: obj.Bucket, Name: obj.Name} + meta := &ObjectMeta{ + Name: name, + Opts: &ObjectMetaOptions{Link: link}, + } + return obs.Put(meta, nil) +} + +// AddBucket will add a link to another object store. +func (ob *obs) AddBucket(name string, bucket ObjectStore) (*ObjectInfo, error) { + if bucket == nil { + return nil, errors.New("nats: bucket required") + } + bos, ok := bucket.(*obs) + if !ok { + return nil, errors.New("nats: bucket malformed") + } + meta := &ObjectMeta{ + Name: name, + Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: bos.name}}, + } + return ob.Put(meta, nil) +} + +// PutBytes is convenience function to put a byte slice into this object store. +func (obs *obs) PutBytes(name string, data []byte) (*ObjectInfo, error) { + return obs.Put(&ObjectMeta{Name: name}, bytes.NewReader(data)) +} + +// GetBytes is a convenience function to pull an object from this object store and return it as a byte slice. +func (obs *obs) GetBytes(name string) ([]byte, error) { + result, err := obs.Get(name) + if err != nil { + return nil, err + } + defer result.Close() + + var b bytes.Buffer + if _, err := b.ReadFrom(result); err != nil { + return nil, err + } + return b.Bytes(), nil +} + +// PutBytes is convenience function to put a string into this object store. +func (obs *obs) PutString(name string, data string) (*ObjectInfo, error) { + return obs.Put(&ObjectMeta{Name: name}, strings.NewReader(data)) +} + +// GetString is a convenience function to pull an object from this object store and return it as a string. +func (obs *obs) GetString(name string) (string, error) { + result, err := obs.Get(name) + if err != nil { + return _EMPTY_, err + } + defer result.Close() + + var b bytes.Buffer + if _, err := b.ReadFrom(result); err != nil { + return _EMPTY_, err + } + return b.String(), nil } // PutFile is convenience function to put a file into an object store. -func (js *js) PutFile(store string, filename string) error { - f, err := os.Open(filename) +func (obs *obs) PutFile(file string) (*ObjectInfo, error) { + f, err := os.Open(file) if err != nil { - return err + return nil, err } defer f.Close() - return js.PutObject(store, &ObjectMeta{Name: filename}, f) + return obs.Put(&ObjectMeta{Name: file}, f) } // GetFile is a convenience function to pull and object and place in a file. -func (js *js) GetFile(store, name, outfile string) error { +func (obs *obs) GetFile(name, file string) error { // Expect file to be new. - f, err := os.OpenFile(outfile, os.O_WRONLY|os.O_CREATE, 0600) + f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE, 0600) if err != nil { return err } defer f.Close() - result, err := js.GetObject(store, name) + result, err := obs.Get(name) if err != nil { - defer os.Remove(f.Name()) + os.Remove(f.Name()) return err } + defer result.Close() + // Stream copy to the file. _, err = io.Copy(f, result) return err } +// GetInfo will retrieve the current information for the object. +func (obs *obs) GetInfo(name string) (*ObjectInfo, error) { + // Lookup the stream to get the bound subject. + obj := sanitizeName(name) + if !nameOk(obj) { + return nil, errBadObjectName + } + + // Grab last meta value we have. + meta := fmt.Sprintf(objMetaPreTmpl, obs.name, obj) + stream := fmt.Sprintf(objNameTmpl, obs.name) + + m, err := obs.js.GetLastMsg(stream, meta) + if err != nil { + return nil, err + } + var info ObjectInfo + if err := json.Unmarshal(m.Data, &info); err != nil { + return nil, errBadMeta + } + info.ModTime = m.Time + return &info, nil +} + +// UpdateMeta will update the meta data for the object. +func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error { + if meta == nil { + return errBadMeta + } + // Grab meta info. + info, err := obs.GetInfo(name) + if err != nil { + return err + } + // Copy new meta + info.ObjectMeta = *meta + mm := NewMsg(fmt.Sprintf(objMetaPreTmpl, obs.name, sanitizeName(meta.Name))) + mm.Data, err = json.Marshal(info) + if err != nil { + return err + } + _, err = obs.js.PublishMsg(mm) + return err +} + +// Seal will seal the object store, no further modifications will be allowed. +func (obs *obs) Seal() error { + stream := fmt.Sprintf(objNameTmpl, obs.name) + si, err := obs.js.StreamInfo(stream) + if err != nil { + return err + } + // Seal the stream from being able to take on more messages. + cfg := si.Config + cfg.Sealed = true + _, err = obs.js.UpdateStream(&cfg) + return err +} + +// Watch for changes in the underlying store and receive meta information updates. +func (obs *obs) Watch(cb ObjectStoreUpdate) (*Subscription, error) { + update := func(m *Msg) { + var info ObjectInfo + if err := json.Unmarshal(m.Data, &info); err != nil { + return // TODO(dlc) - Communicate this upwards? + } + if meta, err := m.Metadata(); err == nil { + info.ModTime = meta.Timestamp + } + cb(&info) + } + // Used ordered consumer to deliver results. + allMeta := fmt.Sprintf(objAllMetaPreTmpl, obs.name) + return obs.js.Subscribe(allMeta, update, OrderedConsumer(), DeliverLastPerSubject()) +} + // Read impl. func (o *objResult) Read(p []byte) (n int, err error) { o.Lock() @@ -450,11 +664,19 @@ func (o *objResult) Read(p []byte) (n int, err error) { if o.err != nil { return 0, err } + if o.r == nil { + return 0, io.EOF + } return o.r.Read(p) } // Close impl. func (o *objResult) Close() error { + o.Lock() + defer o.Unlock() + if o.r == nil { + return nil + } return o.r.Close() } diff --git a/test/kv_test.go b/test/kv_test.go index 56debcc9f..8a47e9efe 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -28,10 +28,10 @@ func TestKeyValueBasics(t *testing.T) { s := RunBasicJetStreamServer() defer shutdown(s) - nc := client(t, s) + nc, js := jsClient(t, s) defer nc.Close() - kv, err := nc.AddKeyValue(&nats.KeyValueConfig{Bucket: "TEST"}) + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"}) expectOk(t, err) if kv.Bucket() != "TEST" { @@ -82,10 +82,10 @@ func TestKeyValueList(t *testing.T) { s := RunBasicJetStreamServer() defer shutdown(s) - nc := client(t, s) + nc, js := jsClient(t, s) defer nc.Close() - kv, err := nc.AddKeyValue(&nats.KeyValueConfig{Bucket: "LIST", History: 10}) + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "LIST", History: 10}) expectOk(t, err) for i := 0; i < 50; i++ { @@ -120,10 +120,10 @@ func TestKeyValueWatch(t *testing.T) { s := RunBasicJetStreamServer() defer shutdown(s) - nc := client(t, s) + nc, js := jsClient(t, s) defer nc.Close() - kv, err := nc.AddKeyValue(&nats.KeyValueConfig{Bucket: "WATCH"}) + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCH"}) expectOk(t, err) updates := make(chan nats.KeyValueEntry, 32) @@ -198,11 +198,11 @@ func TestKeyValueBindStore(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - _, err := nc.AddKeyValue(&nats.KeyValueConfig{Bucket: "WATCH"}) + _, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCH"}) expectOk(t, err) // Now bind to it.. - _, err = nc.KeyValue("WATCH") + _, err = js.KeyValue("WATCH") expectOk(t, err) // Make sure we can't bind to a non-kv style stream. @@ -213,7 +213,7 @@ func TestKeyValueBindStore(t *testing.T) { }) expectOk(t, err) - _, err = nc.KeyValue("TEST") + _, err = js.KeyValue("TEST") expectErr(t, err) if err != nats.ErrBadBucket { t.Fatalf("Expected %v but got %v", nats.ErrBadBucket, err) @@ -224,16 +224,16 @@ func TestKeyValueDeleteStore(t *testing.T) { s := RunBasicJetStreamServer() defer shutdown(s) - nc := client(t, s) + nc, js := jsClient(t, s) defer nc.Close() - _, err := nc.AddKeyValue(&nats.KeyValueConfig{Bucket: "WATCH"}) + _, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCH"}) expectOk(t, err) - err = nc.DeleteKeyValue("WATCH") + err = js.DeleteKeyValue("WATCH") expectOk(t, err) - _, err = nc.KeyValue("WATCH") + _, err = js.KeyValue("WATCH") expectErr(t, err) } diff --git a/test/object_test.go b/test/object_test.go index 7e03594eb..842add2f3 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -21,6 +21,7 @@ import ( "path" "path/filepath" "testing" + "time" "github.com/nats-io/nats.go" ) @@ -32,18 +33,32 @@ func TestObjectBasics(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - err := js.CreateObjectStore(&nats.ObjectConfig{Name: "OBJS"}) + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"}) expectOk(t, err) // Create ~16MB object. blob := make([]byte, 16*1024*1024+22) rand.Read(blob) - err = js.PutObject("OBJS", &nats.ObjectMeta{Name: "BLOB"}, bytes.NewReader(blob)) + now := time.Now().UTC().Round(time.Second) + _, err = obs.PutBytes("BLOB", blob) expectOk(t, err) + // Test info + info, err := obs.GetInfo("BLOB") + expectOk(t, err) + if len(info.NUID) == 0 { + t.Fatalf("Expected object to have a NUID") + } + if info.ModTime.IsZero() { + t.Fatalf("Expected object to have a non-zero ModTime") + } + if mt := info.ModTime.Round(time.Second); mt.Sub(now) != 0 && mt.Sub(now) != time.Second { + t.Fatalf("Expected ModTime to be about %v, got %v", now, mt) + } + // Make sure the stream is sealed. - err = js.SealObjectStore("OBJS") + err = obs.Seal() expectOk(t, err) si, err := js.StreamInfo("OBJ_OBJS") expectOk(t, err) @@ -52,17 +67,17 @@ func TestObjectBasics(t *testing.T) { } // Check simple errors. - _, err = js.GetObject("OBJS", "FOO") + _, err = obs.Get("FOO") expectErr(t, err) // Now get the object back. - result, err := js.GetObject("OBJS", "BLOB") + result, err := obs.Get("BLOB") expectOk(t, err) expectOk(t, result.Error()) defer result.Close() - // Check info stuff. - info, err := result.Info() + // Check info. + info, err = result.Info() expectOk(t, err) if info.Size != uint64(len(blob)) { t.Fatalf("Size does not match, %d vs %d", info.Size, len(blob)) @@ -77,7 +92,7 @@ func TestObjectBasics(t *testing.T) { // Test delete. err = js.DeleteObjectStore("OBJS") expectOk(t, err) - _, err = js.GetObject("OBJS", "BLOB") + _, err = obs.Get("BLOB") expectErr(t, err, nats.ErrStreamNotFound) } @@ -88,7 +103,7 @@ func TestObjectFileBasics(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - err := js.CreateObjectStore(&nats.ObjectConfig{Name: "FILES"}) + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "FILES"}) expectOk(t, err) // Create ~8MB object. @@ -101,14 +116,14 @@ func TestObjectFileBasics(t *testing.T) { err = ioutil.WriteFile(tmpFile.Name(), blob, 0600) expectOk(t, err) - err = js.PutFile("FILES", tmpFile.Name()) + _, err = obs.PutFile(tmpFile.Name()) expectOk(t, err) tmpResult, err := ioutil.TempFile("", "objfileresult") expectOk(t, err) defer os.Remove(tmpResult.Name()) // clean up - err = js.GetFile("FILES", tmpFile.Name(), tmpResult.Name()) + err = obs.GetFile(tmpFile.Name(), tmpResult.Name()) expectOk(t, err) // Make sure they are the same. @@ -130,8 +145,7 @@ func TestObjectMulti(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - on := "TEST_FILES" - err := js.CreateObjectStore(&nats.ObjectConfig{Name: on}) + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "TEST_FILES"}) expectOk(t, err) numFiles := 0 @@ -142,16 +156,16 @@ func TestObjectMulti(t *testing.T) { if filepath.Ext(fn) != ".go" || fn[0] == '.' || fn[0] == '#' { continue } - err = js.PutFile(on, fn) + _, err = obs.PutFile(fn) expectOk(t, err) numFiles++ } - expectOk(t, js.SealObjectStore(on)) + expectOk(t, obs.Seal()) _, err = js.StreamInfo("OBJ_TEST_FILES") expectOk(t, err) - result, err := js.GetObject(on, "object_test.go") + result, err := obs.Get("object_test.go") expectOk(t, err) expectOk(t, result.Error()) defer result.Close() @@ -170,6 +184,38 @@ func TestObjectMulti(t *testing.T) { } } +func TestObjectDeleteMarkers(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"}) + expectOk(t, err) + + msg := bytes.Repeat([]byte("A"), 100) + _, err = obs.PutBytes("A", msg) + expectOk(t, err) + + err = obs.Delete("A") + expectOk(t, err) + + si, err := js.StreamInfo("OBJ_OBJS") + expectOk(t, err) + + // We should have one message left. The delete marker. + if si.State.Msgs != 1 { + t.Fatalf("Expected 1 marker msg, got %d msgs", si.State.Msgs) + } + // Make sure we have a delete marker, this will be there to drive Watch functionality. + info, err := obs.GetInfo("A") + expectOk(t, err) + if !info.Deleted { + t.Fatalf("Expected info to be marked as deleted") + } +} + func TestObjectMultiWithDelete(t *testing.T) { s := RunBasicJetStreamServer() defer shutdown(s) @@ -177,28 +223,23 @@ func TestObjectMultiWithDelete(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - err := js.CreateObjectStore(&nats.ObjectConfig{Name: "2OD"}) + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "2OD"}) expectOk(t, err) pa := bytes.Repeat([]byte("A"), 2_000_000) pb := bytes.Repeat([]byte("B"), 3_000_000) - err = js.PutObject("2OD", &nats.ObjectMeta{Name: "A"}, bytes.NewReader(pa)) + _, err = obs.PutBytes("A", pa) expectOk(t, err) // Hold onto this so we can make sure DeleteObject clears all messages, chunks and meta. si, err := js.StreamInfo("OBJ_2OD") expectOk(t, err) - err = js.PutObject("2OD", &nats.ObjectMeta{Name: "B"}, bytes.NewReader(pb)) + _, err = obs.PutBytes("B", pb) expectOk(t, err) - result, err := js.GetObject("2OD", "B") - expectOk(t, err) - expectOk(t, result.Error()) - defer result.Close() - - pb2, err := ioutil.ReadAll(result) + pb2, err := obs.GetBytes("B") expectOk(t, err) if !bytes.Equal(pb, pb2) { @@ -206,13 +247,13 @@ func TestObjectMultiWithDelete(t *testing.T) { } // Now delete B - err = js.DeleteObject("2OD", "B") + err = obs.Delete("B") expectOk(t, err) siad, err := js.StreamInfo("OBJ_2OD") expectOk(t, err) - if siad.State.Msgs != si.State.Msgs { - t.Fatalf("Expected to have %d msgs after delete, got %d", siad.State.Msgs, si.State.Msgs) + if siad.State.Msgs != si.State.Msgs+1 { // +1 more delete marker. + t.Fatalf("Expected to have %d msgs after delete, got %d", siad.State.Msgs, si.State.Msgs+1) } } @@ -223,30 +264,187 @@ func TestObjectNames(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - err := js.CreateObjectStore(&nats.ObjectConfig{Name: "OBJS"}) + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"}) expectOk(t, err) - // Create ~1K object. - blob := make([]byte, 1024) - rand.Read(blob) - r := bytes.NewReader(blob) - // Test filename like naming. - err = js.PutObject("OBJS", &nats.ObjectMeta{Name: "BLOB.txt"}, r) + _, err = obs.PutString("BLOB.txt", "A") expectOk(t, err) // Spaces ok - err = js.PutObject("OBJS", &nats.ObjectMeta{Name: "foo bar"}, r) + _, err = obs.PutString("foo bar", "A") expectOk(t, err) // Errors - err = js.PutObject("OBJS", &nats.ObjectMeta{Name: "*"}, r) + _, err = obs.PutString("*", "A") expectErr(t, err) - err = js.PutObject("OBJS", &nats.ObjectMeta{Name: ">"}, r) + _, err = obs.PutString(">", "A") expectErr(t, err) - err = js.PutObject("OBJS", &nats.ObjectMeta{Name: ""}, r) + _, err = obs.PutString("", "A") expectErr(t, err) - err = js.PutObject("OBJS", &nats.ObjectMeta{Name: "\t"}, r) + _, err = obs.PutString("", "\t") expectErr(t, err) - err = js.PutObject("OBJS", &nats.ObjectMeta{Name: "\n"}, r) + _, err = obs.PutString("", "\n") expectErr(t, err) } + +func TestObjectMetadata(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "META-TEST"}) + expectOk(t, err) + + // Simple with no Meta. + _, err = obs.PutString("A", "AAA") + expectOk(t, err) + + meta := &nats.ObjectMeta{Name: "B"} + + err = obs.UpdateMeta("A", meta) + expectOk(t, err) + + info, err := obs.GetInfo("B") + expectOk(t, err) + if info.Name != "B" { + t.Fatalf("Update failed: %+v", info) + } + meta.Headers = make(nats.Header) + meta.Headers.Set("color", "red") + + err = obs.UpdateMeta("B", meta) + expectOk(t, err) + + info, err = obs.GetInfo("B") + expectOk(t, err) + if info.Headers == nil || info.Headers.Get("color") != "red" { + t.Fatalf("Update failed: %+v", info) + } +} + +func TestObjectWatch(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "WATCH-TEST"}) + expectOk(t, err) + + _, err = obs.PutString("A", "AAA") + expectOk(t, err) + + _, err = obs.PutString("B", "BBB") + expectOk(t, err) + + updates := make(chan *nats.ObjectInfo, 32) + sub, err := obs.Watch(func(meta *nats.ObjectInfo) { + updates <- meta + }) + expectOk(t, err) + defer sub.Unsubscribe() + + expectUpdate := func(name string) { + t.Helper() + select { + case info := <-updates: + if false && info.Name != name { + t.Fatalf("Expected update for %q, but got %+v", name, info) + } + case <-time.After(time.Second): + t.Fatalf("Did not receive an update like expected") + } + } + + expectNoMoreUpdates := func() { + t.Helper() + select { + case info := <-updates: + t.Fatalf("Got an unexpected update: %+v", info) + case <-time.After(100 * time.Millisecond): + } + } + + // Initial Values. + expectUpdate("A") + expectUpdate("B") + expectNoMoreUpdates() + + // Delete + err = obs.Delete("A") + expectOk(t, err) + + // FIXME(dlc) - I think there is a bug in server on rollup that causes an update on "B" here. + expectUpdate("B") + + expectUpdate("A") + expectNoMoreUpdates() + + // New + _, err = obs.PutString("C", "CCC") + expectOk(t, err) + + // Update Meta + info, err := obs.GetInfo("A") + expectOk(t, err) + meta := &info.ObjectMeta + meta.Description = "A's are better than B's" + err = obs.UpdateMeta("A", meta) + expectOk(t, err) +} + +func TestObjectLinks(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + root, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "ROOT"}) + expectOk(t, err) + + _, err = root.PutString("A", "AAA") + expectOk(t, err) + _, err = root.PutString("B", "BBB") + expectOk(t, err) + + info, err := root.GetInfo("A") + expectOk(t, err) + + // Self link to individual object. + _, err = root.AddLink("LA", info) + expectOk(t, err) + + dir, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "DIR"}) + expectOk(t, err) + + _, err = dir.PutString("DIR/A", "DIR-AAA") + expectOk(t, err) + _, err = dir.PutString("DIR/B", "DIR-BBB") + expectOk(t, err) + + info, err = dir.GetInfo("DIR/B") + expectOk(t, err) + + binfo, err := root.AddLink("DBL", info) + expectOk(t, err) + + if binfo.Name != "DBL" || binfo.NUID == "" || binfo.ModTime.IsZero() { + t.Fatalf("Link info not what was expected: %+v", binfo) + } + + // Now add whole other store as a link, like a directory. + _, err = root.AddBucket("dir", dir) + expectOk(t, err) + + // Now try to get a linked object. + dbl, err := root.GetString("DBL") + expectOk(t, err) + + if dbl != "DIR-BBB" { + t.Fatalf("Expected %q but got %q", "DIR-BBB", dbl) + } +}