diff --git a/object.go b/object.go index dcc468f32..a9a93b77e 100644 --- a/object.go +++ b/object.go @@ -75,7 +75,7 @@ type ObjectStore interface { // GetInfo will retrieve the current information for the object. GetInfo(name string) (*ObjectInfo, error) - // UpdateMeta will update the meta data for the object. + // UpdateMeta will update the metadata for the object. UpdateMeta(name string, meta *ObjectMeta) error // Delete will delete the named object. @@ -133,6 +133,14 @@ var ( ErrObjectAlreadyExists = errors.New("nats: an object already exists with that name") ErrNameRequired = errors.New("nats: name is required") ErrNeeds262 = errors.New("nats: object-store requires at least server version 2.6.2") + ErrLinkNotAllowed = errors.New("nats: link cannot be set when putting the object in bucket") + ErrObjectRequired = errors.New("nats: object required") + ErrNoLinkToDeleted = errors.New("nats: not allowed to link to a deleted object") + ErrNoLinkToLink = errors.New("nats: not allowed to link to another link") + ErrCantGetBucket = errors.New("nats: invalid Get, object is a link to a bucket") + ErrBucketRequired = errors.New("nats: bucket required") + ErrBucketMalformed = errors.New("nats: bucket malformed") + ErrUpdateMetaDeleted = errors.New("nats: cannot update meta for a deleted object") ) // ObjectStoreConfig is the config for the object store. @@ -311,6 +319,14 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn return nil, ErrBadObjectMeta } + if meta.Opts == nil { + meta.Opts = &ObjectMetaOptions{ChunkSize: objDefaultChunkSize} + } else if meta.Opts.Link != nil { + return nil, ErrLinkNotAllowed + } else if meta.Opts.ChunkSize == 0 { + meta.Opts.ChunkSize = objDefaultChunkSize + } + var o objOpts for _, opt := range opts { if opt != nil { @@ -356,13 +372,8 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn return nil, err } - chunkSize := objDefaultChunkSize - if meta.Opts != nil && meta.Opts.ChunkSize > 0 { - chunkSize = meta.Opts.ChunkSize - } - m, h := NewMsg(chunkSubj), sha256.New() - chunk, sent, total := make([]byte, chunkSize), 0, uint64(0) + chunk, sent, total := make([]byte, meta.Opts.ChunkSize), 0, uint64(0) // set up the info object. The chunk upload sets the size and digest info := &ObjectInfo{Bucket: obs.name, NUID: newnuid, ObjectMeta: *meta} @@ -503,7 +514,7 @@ func (obs *obs) Get(name string, opts ...ObjectOpt) (ObjectResult, error) { // Check for object links. If single objects we do a pass through. if info.isLink() { if info.ObjectMeta.Opts.Link.Name == _EMPTY_ { - return nil, errors.New("nats: object is a link to a bucket") + return nil, ErrCantGetBucket } // is the link in the same bucket? @@ -613,21 +624,34 @@ func (obs *obs) Delete(name string) error { info.Deleted = true info.Size, info.Chunks, info.Digest = 0, 0, _EMPTY_ - metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(name)) - mm := NewMsg(metaSubj) - mm.Data, err = json.Marshal(info) + if err = publishMeta(info, obs.js); err != nil { + return err + } + + // Purge chunks for the object. + chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID) + return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}) +} + +func publishMeta(info *ObjectInfo, js JetStreamContext) error { + // marshal the object into json, don't store an actual time + info.ModTime = time.Time{} + data, err := json.Marshal(info) if err != nil { return err } + + // Prepare and publish the message. + mm := NewMsg(fmt.Sprintf(objMetaPreTmpl, info.Bucket, encodeName(info.ObjectMeta.Name))) mm.Header.Set(MsgRollup, MsgRollupSubject) - _, err = obs.js.PublishMsg(mm) - if err != nil { + mm.Data = data + if _, err := js.PublishMsg(mm); err != nil { return err } - // Purge chunks for the object. - chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID) - return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}) + // set the ModTime in case it's returned to the user, even though it's not the correct time. + info.ModTime = time.Now().UTC() + return nil } // AddLink will add a link to another object if it's not deleted and not another link @@ -637,14 +661,17 @@ func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) { if name == "" { return nil, ErrNameRequired } + + // TODO Handle stale info + if obj == nil || obj.Name == "" { - return nil, errors.New("nats: object required") + return nil, ErrObjectRequired } if obj.Deleted { - return nil, errors.New("nats: not allowed to link to a deleted object") + return nil, ErrNoLinkToDeleted } if obj.isLink() { - return nil, errors.New("nats: not allowed to link to another link") + return nil, ErrNoLinkToLink } // If object with link's name is found, error. @@ -664,9 +691,14 @@ func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) { Name: name, Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: obj.Bucket, Name: obj.Name}}, } + info := &ObjectInfo{Bucket: obs.name, NUID: nuid.Next(), ModTime: time.Now().UTC(), ObjectMeta: *meta} // put the link object - return obs.Put(meta, nil) + if err = publishMeta(info, obs.js); err != nil { + return nil, err + } + + return info, nil } // AddBucketLink will add a link to another object store. @@ -675,11 +707,11 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro return nil, ErrNameRequired } if bucket == nil { - return nil, errors.New("nats: bucket required") + return nil, ErrBucketRequired } bos, ok := bucket.(*obs) if !ok { - return nil, errors.New("nats: bucket malformed") + return nil, ErrBucketMalformed } // If object with link's name is found, error. @@ -699,9 +731,15 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro Name: name, Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: bos.name}}, } + info := &ObjectInfo{Bucket: ob.name, NUID: nuid.Next(), ObjectMeta: *meta} // put the link object - return ob.Put(meta, nil) + err = publishMeta(info, ob.js) + if err != nil { + return nil, err + } + + return info, nil } // PutBytes is convenience function to put a byte slice into this object store. @@ -813,12 +851,11 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error { } if info.Deleted { - return errors.New("nats: cannot update meta for a deleted object") + return ErrUpdateMetaDeleted } // If the new name is different from the old, and it exists, error // If there was an error that was not ErrObjectNotFound, error. - // sff - Is there a better go way to do this? if name != meta.Name { _, err = obs.GetInfo(meta.Name) if err != ErrObjectNotFound { @@ -836,17 +873,7 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error { info.Headers = meta.Headers // Prepare the meta message - metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(meta.Name)) - mm := NewMsg(metaSubj) - mm.Header.Set(MsgRollup, MsgRollupSubject) - mm.Data, err = json.Marshal(info) - if err != nil { - return err - } - - // Publish the meta message. - _, err = obs.js.PublishMsg(mm) - if err != nil { + if err = publishMeta(info, obs.js); err != nil { return err } diff --git a/test/object_test.go b/test/object_test.go index deb7527d1..5baa861ec 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -35,6 +35,12 @@ func TestObjectBasics(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() + _, err := js.CreateObjectStore(nil) + expectErr(t, err, nats.ErrObjectConfigRequired) + + _, err = js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "notok!", Description: "testing"}) + expectErr(t, err, nats.ErrInvalidStoreName) + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"}) expectOk(t, err) @@ -102,21 +108,22 @@ func TestObjectBasics(t *testing.T) { if !bytes.Equal(copy, blob) { t.Fatalf("Result not the same") } - // Test delete. - err = js.DeleteObjectStore("OBJS") - expectOk(t, err) - _, err = obs.Get("BLOB") - expectErr(t, err, nats.ErrStreamNotFound) // Check simple errors. _, err = obs.Get("FOO") - expectErr(t, err) + expectErr(t, err, nats.ErrObjectNotFound) _, err = obs.Get("") - expectErr(t, err) + expectErr(t, err, nats.ErrNameRequired) _, err = obs.PutBytes("", blob) - expectErr(t, err) + expectErr(t, err, nats.ErrBadObjectMeta) + + // Test delete. + err = js.DeleteObjectStore("OBJS") + expectOk(t, err) + _, err = obs.Get("BLOB") + expectErr(t, err, nats.ErrStreamNotFound) } func TestGetObjectDigestMismatch(t *testing.T) { @@ -290,7 +297,7 @@ func TestObjectDeleteMarkers(t *testing.T) { si, err := js.StreamInfo("OBJ_OBJS") expectOk(t, err) - // We should have one message left. The delete marker. + // We should have one message left, the "delete" marker. if si.State.Msgs != 1 { t.Fatalf("Expected 1 marker msg, got %d msgs", si.State.Msgs) } @@ -368,7 +375,7 @@ func TestObjectNames(t *testing.T) { // Errors _, err = obs.PutString("", "A") - expectErr(t, err) + expectErr(t, err, nats.ErrBadObjectMeta) } func TestObjectMetadata(t *testing.T) { @@ -425,12 +432,17 @@ func TestObjectMetadata(t *testing.T) { err = obs.Delete("B") expectOk(t, err) err = obs.UpdateMeta("B", meta) - expectErr(t, err) + expectErr(t, err, nats.ErrUpdateMetaDeleted) err = obs.UpdateMeta("X", meta) if err == nil { t.Fatal("Expected an error when trying to update an object that does not exist.") } + + // can't have a link when putting an object + meta.Opts = &nats.ObjectMetaOptions{Link: &nats.ObjectLink{Bucket: "DoesntMatter"}} + _, err = obs.Put(meta, nil) + expectErr(t, err, nats.ErrLinkNotAllowed) } func TestObjectWatch(t *testing.T) { @@ -451,7 +463,7 @@ func TestObjectWatch(t *testing.T) { t.Helper() select { case info := <-watcher.Updates(): - if false && info.Name != name { + if false && info.Name != name { // TODO what is supposed to happen here? t.Fatalf("Expected update for %q, but got %+v", name, info) } case <-time.After(time.Second): @@ -513,7 +525,7 @@ func TestObjectWatch(t *testing.T) { meta := &deletedInfo.ObjectMeta meta.Description = "Making a change." err = obs.UpdateMeta("A", meta) - expectErr(t, err) + expectErr(t, err, nats.ErrUpdateMetaDeleted) } func TestObjectLinks(t *testing.T) { @@ -541,7 +553,7 @@ func TestObjectLinks(t *testing.T) { // link to a link _, err = root.AddLink("LALA", infoLA) - expectErr(t, err) + expectErr(t, err, nats.ErrNoLinkToLink) dir, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "DIR"}) expectOk(t, err) @@ -562,6 +574,9 @@ func TestObjectLinks(t *testing.T) { infoBucketLink, err := root.AddBucketLink("dir", dir) expectOk(t, err) + _, err = root.Get(infoBucketLink.Name) + expectErr(t, err, nats.ErrCantGetBucket) + expectLinkPartsAreCorrect(t, infoBucketLink, "DIR", "") // Try to get a linked object, same bucket @@ -603,25 +618,41 @@ func TestObjectLinks(t *testing.T) { // Check simple errors. _, err = root.AddLink("", infoB) - expectErr(t, err) + expectErr(t, err, nats.ErrNameRequired) + + // A is already an object + _, err = root.AddLink("A", infoB) + expectErr(t, err, nats.ErrObjectAlreadyExists) _, err = root.AddLink("Nil Object", nil) - expectErr(t, err) + expectErr(t, err, nats.ErrObjectRequired) infoB.Name = "" _, err = root.AddLink("Empty Info Name", infoB) - expectErr(t, err) + expectErr(t, err, nats.ErrObjectRequired) // Check Error Link to a Link _, err = root.AddLink("Link To Link", infoLB) - expectErr(t, err) + expectErr(t, err, nats.ErrNoLinkToLink) - // Check Error Link to a Link + // Check Errors on bucket linking _, err = root.AddBucketLink("", root) - expectErr(t, err) + expectErr(t, err, nats.ErrNameRequired) _, err = root.AddBucketLink("Nil Bucket", nil) - expectErr(t, err) + expectErr(t, err, nats.ErrBucketRequired) + + err = root.Delete("A") + expectOk(t, err) + + _, err = root.AddLink("ToDeletedStale", infoA) + expectOk(t, err) // TODO deal with this in the code somehow + + infoA, err = root.GetInfo("A") + expectOk(t, err) + + _, err = root.AddLink("ToDeletedFresh", infoA) + expectErr(t, err, nats.ErrNoLinkToDeleted) } func expectLinkIsCorrect(t *testing.T, originalObject *nats.ObjectInfo, linkObject *nats.ObjectInfo) { @@ -681,6 +712,9 @@ func TestObjectList(t *testing.T) { root, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "ROOT"}) expectOk(t, err) + _, err = root.List() + expectErr(t, err, nats.ErrNoObjectsFound) + put := func(name, value string) { _, err = root.PutString(name, value) expectOk(t, err)