From b9cc941a681f07659e1c77cacd4a435383d8056a Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 30 Aug 2022 14:27:57 -0400 Subject: [PATCH 1/7] link not allowed in put meta --- object.go | 16 ++++++++++------ test/object_test.go | 5 +++++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/object.go b/object.go index dcc468f32..41a5848b8 100644 --- a/object.go +++ b/object.go @@ -133,6 +133,7 @@ var ( ErrObjectAlreadyExists = errors.New("nats: an object already exists with that name") ErrNameRequired = errors.New("nats: name is required") ErrNeeds262 = errors.New("nats: object-store requires at least server version 2.6.2") + ErrLinkNotAllowed = errors.New("nats: a link is not allowed with an object") ) // ObjectStoreConfig is the config for the object store. @@ -311,6 +312,14 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn return nil, ErrBadObjectMeta } + if meta.Opts == nil { + meta.Opts = &ObjectMetaOptions{ChunkSize: objDefaultChunkSize} + } else if meta.Opts.Link != nil { + return nil, ErrLinkNotAllowed + } else if meta.Opts.ChunkSize <= 0 { + meta.Opts.ChunkSize = objDefaultChunkSize + } + var o objOpts for _, opt := range opts { if opt != nil { @@ -356,13 +365,8 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn return nil, err } - chunkSize := objDefaultChunkSize - if meta.Opts != nil && meta.Opts.ChunkSize > 0 { - chunkSize = meta.Opts.ChunkSize - } - m, h := NewMsg(chunkSubj), sha256.New() - chunk, sent, total := make([]byte, chunkSize), 0, uint64(0) + chunk, sent, total := make([]byte, meta.Opts.ChunkSize), 0, uint64(0) // set up the info object. The chunk upload sets the size and digest info := &ObjectInfo{Bucket: obs.name, NUID: newnuid, ObjectMeta: *meta} diff --git a/test/object_test.go b/test/object_test.go index deb7527d1..32cb25c30 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -431,6 +431,11 @@ func TestObjectMetadata(t *testing.T) { if err == nil { t.Fatal("Expected an error when trying to update an object that does not exist.") } + + // can't have a link when putting an object + meta.Opts = &nats.ObjectMetaOptions{Link: &nats.ObjectLink{Bucket: "DoesntMatter"}} + _, err = obs.Put(meta, nil) + expectErr(t, err, nats.ErrLinkNotAllowed) } func TestObjectWatch(t *testing.T) { From c9136ca78d4bd60d2dc1cd2af6e8f2b2adff3a1a Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 30 Aug 2022 14:30:06 -0400 Subject: [PATCH 2/7] ChunkSize is uint so only need to check for 0 --- object.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/object.go b/object.go index 41a5848b8..5d0936cc7 100644 --- a/object.go +++ b/object.go @@ -316,7 +316,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn meta.Opts = &ObjectMetaOptions{ChunkSize: objDefaultChunkSize} } else if meta.Opts.Link != nil { return nil, ErrLinkNotAllowed - } else if meta.Opts.ChunkSize <= 0 { + } else if meta.Opts.ChunkSize == 0 { meta.Opts.ChunkSize = objDefaultChunkSize } From 5309e5d59ac3d458854112af82790e03eb808985 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 31 Aug 2022 12:51:35 -0400 Subject: [PATCH 3/7] add link / add bucket link don't need to call Put since there is no data, they can just publish the object info (meta) --- object.go | 42 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/object.go b/object.go index 5d0936cc7..a7cbe1f30 100644 --- a/object.go +++ b/object.go @@ -133,7 +133,7 @@ var ( ErrObjectAlreadyExists = errors.New("nats: an object already exists with that name") ErrNameRequired = errors.New("nats: name is required") ErrNeeds262 = errors.New("nats: object-store requires at least server version 2.6.2") - ErrLinkNotAllowed = errors.New("nats: a link is not allowed with an object") + ErrLinkNotAllowed = errors.New("nats: link cannot be set when putting the object in bucket") ) // ObjectStoreConfig is the config for the object store. @@ -634,6 +634,30 @@ func (obs *obs) Delete(name string) error { return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}) } +func publishMeta(info *ObjectInfo, js JetStreamContext) error { + // Prepare the meta message + metaSubj := fmt.Sprintf(objMetaPreTmpl, info.Bucket, encodeName(info.ObjectMeta.Name)) + mm := NewMsg(metaSubj) + mm.Header.Set(MsgRollup, MsgRollupSubject) + + info.ModTime = time.Time{} // We don't store this, just do it after publish + data, err := json.Marshal(info) + if err != nil { + return err + } + + // Publish the meta message. + mm.Data = data + _, err = js.PublishMsg(mm) + if err != nil { + return err + } + + // set the ModTime in case it's returned to the user, even though it's not the correct time. + info.ModTime = time.Now().UTC() + return nil +} + // AddLink will add a link to another object if it's not deleted and not another link // name is the name of this link object // obj is what is being linked too @@ -668,9 +692,15 @@ func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) { Name: name, Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: obj.Bucket, Name: obj.Name}}, } + info := &ObjectInfo{Bucket: obs.name, NUID: nuid.Next(), ModTime: time.Now().UTC(), ObjectMeta: *meta} // put the link object - return obs.Put(meta, nil) + err = publishMeta(info, obs.js) + if err != nil { + return nil, err + } + + return info, nil } // AddBucketLink will add a link to another object store. @@ -703,9 +733,15 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro Name: name, Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: bos.name}}, } + info := &ObjectInfo{Bucket: ob.name, NUID: nuid.Next(), ObjectMeta: *meta} // put the link object - return ob.Put(meta, nil) + err = publishMeta(info, ob.js) + if err != nil { + return nil, err + } + + return info, nil } // PutBytes is convenience function to put a byte slice into this object store. From 01f561fc35e993053106fea0a46f0e581015ee88 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 31 Aug 2022 12:54:18 -0400 Subject: [PATCH 4/7] fixed typo --- object.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/object.go b/object.go index a7cbe1f30..c9071ceec 100644 --- a/object.go +++ b/object.go @@ -75,7 +75,7 @@ type ObjectStore interface { // GetInfo will retrieve the current information for the object. GetInfo(name string) (*ObjectInfo, error) - // UpdateMeta will update the meta data for the object. + // UpdateMeta will update the metadata for the object. UpdateMeta(name string, meta *ObjectMeta) error // Delete will delete the named object. From cae283b1a64da95a28f04873dbf52c8b11871ac0 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 31 Aug 2022 13:49:38 -0400 Subject: [PATCH 5/7] addressed review comments --- object.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/object.go b/object.go index c9071ceec..b9cc3dd1b 100644 --- a/object.go +++ b/object.go @@ -648,8 +648,7 @@ func publishMeta(info *ObjectInfo, js JetStreamContext) error { // Publish the meta message. mm.Data = data - _, err = js.PublishMsg(mm) - if err != nil { + if _, err := js.PublishMsg(mm); err != nil { return err } @@ -695,8 +694,7 @@ func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) { info := &ObjectInfo{Bucket: obs.name, NUID: nuid.Next(), ModTime: time.Now().UTC(), ObjectMeta: *meta} // put the link object - err = publishMeta(info, obs.js) - if err != nil { + if err = publishMeta(info, obs.js); err != nil { return nil, err } From 61f0f7f2077c8deba99ead3b226704ff47ee4903 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 31 Aug 2022 18:15:57 -0400 Subject: [PATCH 6/7] Inlined and used publishMeta whenever possible. Converted all errors to vars. Updated unit test to check and cover errors. --- object.go | 59 +++++++++++++++-------------------- test/object_test.go | 75 +++++++++++++++++++++++++++++++-------------- 2 files changed, 76 insertions(+), 58 deletions(-) diff --git a/object.go b/object.go index b9cc3dd1b..a9a93b77e 100644 --- a/object.go +++ b/object.go @@ -134,6 +134,13 @@ var ( ErrNameRequired = errors.New("nats: name is required") ErrNeeds262 = errors.New("nats: object-store requires at least server version 2.6.2") ErrLinkNotAllowed = errors.New("nats: link cannot be set when putting the object in bucket") + ErrObjectRequired = errors.New("nats: object required") + ErrNoLinkToDeleted = errors.New("nats: not allowed to link to a deleted object") + ErrNoLinkToLink = errors.New("nats: not allowed to link to another link") + ErrCantGetBucket = errors.New("nats: invalid Get, object is a link to a bucket") + ErrBucketRequired = errors.New("nats: bucket required") + ErrBucketMalformed = errors.New("nats: bucket malformed") + ErrUpdateMetaDeleted = errors.New("nats: cannot update meta for a deleted object") ) // ObjectStoreConfig is the config for the object store. @@ -507,7 +514,7 @@ func (obs *obs) Get(name string, opts ...ObjectOpt) (ObjectResult, error) { // Check for object links. If single objects we do a pass through. if info.isLink() { if info.ObjectMeta.Opts.Link.Name == _EMPTY_ { - return nil, errors.New("nats: object is a link to a bucket") + return nil, ErrCantGetBucket } // is the link in the same bucket? @@ -617,15 +624,7 @@ func (obs *obs) Delete(name string) error { info.Deleted = true info.Size, info.Chunks, info.Digest = 0, 0, _EMPTY_ - metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(name)) - mm := NewMsg(metaSubj) - mm.Data, err = json.Marshal(info) - if err != nil { - return err - } - mm.Header.Set(MsgRollup, MsgRollupSubject) - _, err = obs.js.PublishMsg(mm) - if err != nil { + if err = publishMeta(info, obs.js); err != nil { return err } @@ -635,18 +634,16 @@ func (obs *obs) Delete(name string) error { } func publishMeta(info *ObjectInfo, js JetStreamContext) error { - // Prepare the meta message - metaSubj := fmt.Sprintf(objMetaPreTmpl, info.Bucket, encodeName(info.ObjectMeta.Name)) - mm := NewMsg(metaSubj) - mm.Header.Set(MsgRollup, MsgRollupSubject) - - info.ModTime = time.Time{} // We don't store this, just do it after publish + // marshal the object into json, don't store an actual time + info.ModTime = time.Time{} data, err := json.Marshal(info) if err != nil { return err } - // Publish the meta message. + // Prepare and publish the message. + mm := NewMsg(fmt.Sprintf(objMetaPreTmpl, info.Bucket, encodeName(info.ObjectMeta.Name))) + mm.Header.Set(MsgRollup, MsgRollupSubject) mm.Data = data if _, err := js.PublishMsg(mm); err != nil { return err @@ -664,14 +661,17 @@ func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) { if name == "" { return nil, ErrNameRequired } + + // TODO Handle stale info + if obj == nil || obj.Name == "" { - return nil, errors.New("nats: object required") + return nil, ErrObjectRequired } if obj.Deleted { - return nil, errors.New("nats: not allowed to link to a deleted object") + return nil, ErrNoLinkToDeleted } if obj.isLink() { - return nil, errors.New("nats: not allowed to link to another link") + return nil, ErrNoLinkToLink } // If object with link's name is found, error. @@ -707,11 +707,11 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro return nil, ErrNameRequired } if bucket == nil { - return nil, errors.New("nats: bucket required") + return nil, ErrBucketRequired } bos, ok := bucket.(*obs) if !ok { - return nil, errors.New("nats: bucket malformed") + return nil, ErrBucketMalformed } // If object with link's name is found, error. @@ -851,12 +851,11 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error { } if info.Deleted { - return errors.New("nats: cannot update meta for a deleted object") + return ErrUpdateMetaDeleted } // If the new name is different from the old, and it exists, error // If there was an error that was not ErrObjectNotFound, error. - // sff - Is there a better go way to do this? if name != meta.Name { _, err = obs.GetInfo(meta.Name) if err != ErrObjectNotFound { @@ -874,17 +873,7 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error { info.Headers = meta.Headers // Prepare the meta message - metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(meta.Name)) - mm := NewMsg(metaSubj) - mm.Header.Set(MsgRollup, MsgRollupSubject) - mm.Data, err = json.Marshal(info) - if err != nil { - return err - } - - // Publish the meta message. - _, err = obs.js.PublishMsg(mm) - if err != nil { + if err = publishMeta(info, obs.js); err != nil { return err } diff --git a/test/object_test.go b/test/object_test.go index 32cb25c30..a8d321949 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -35,7 +35,13 @@ func TestObjectBasics(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"}) + obs, err := js.CreateObjectStore(nil) + expectErr(t, err, nats.ErrObjectConfigRequired) + + obs, err = js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "notok!", Description: "testing"}) + expectErr(t, err, nats.ErrInvalidStoreName) + + obs, err = js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"}) expectOk(t, err) // Create ~16MB object. @@ -102,21 +108,22 @@ func TestObjectBasics(t *testing.T) { if !bytes.Equal(copy, blob) { t.Fatalf("Result not the same") } - // Test delete. - err = js.DeleteObjectStore("OBJS") - expectOk(t, err) - _, err = obs.Get("BLOB") - expectErr(t, err, nats.ErrStreamNotFound) // Check simple errors. _, err = obs.Get("FOO") - expectErr(t, err) + expectErr(t, err, nats.ErrObjectNotFound) _, err = obs.Get("") - expectErr(t, err) + expectErr(t, err, nats.ErrNameRequired) _, err = obs.PutBytes("", blob) - expectErr(t, err) + expectErr(t, err, nats.ErrBadObjectMeta) + + // Test delete. + err = js.DeleteObjectStore("OBJS") + expectOk(t, err) + _, err = obs.Get("BLOB") + expectErr(t, err, nats.ErrStreamNotFound) } func TestGetObjectDigestMismatch(t *testing.T) { @@ -290,7 +297,7 @@ func TestObjectDeleteMarkers(t *testing.T) { si, err := js.StreamInfo("OBJ_OBJS") expectOk(t, err) - // We should have one message left. The delete marker. + // We should have one message left, the "delete" marker. if si.State.Msgs != 1 { t.Fatalf("Expected 1 marker msg, got %d msgs", si.State.Msgs) } @@ -368,7 +375,7 @@ func TestObjectNames(t *testing.T) { // Errors _, err = obs.PutString("", "A") - expectErr(t, err) + expectErr(t, err, nats.ErrBadObjectMeta) } func TestObjectMetadata(t *testing.T) { @@ -425,7 +432,7 @@ func TestObjectMetadata(t *testing.T) { err = obs.Delete("B") expectOk(t, err) err = obs.UpdateMeta("B", meta) - expectErr(t, err) + expectErr(t, err, nats.ErrUpdateMetaDeleted) err = obs.UpdateMeta("X", meta) if err == nil { @@ -456,7 +463,7 @@ func TestObjectWatch(t *testing.T) { t.Helper() select { case info := <-watcher.Updates(): - if false && info.Name != name { + if false && info.Name != name { // TODO what is supposed to happen here? t.Fatalf("Expected update for %q, but got %+v", name, info) } case <-time.After(time.Second): @@ -518,7 +525,7 @@ func TestObjectWatch(t *testing.T) { meta := &deletedInfo.ObjectMeta meta.Description = "Making a change." err = obs.UpdateMeta("A", meta) - expectErr(t, err) + expectErr(t, err, nats.ErrUpdateMetaDeleted) } func TestObjectLinks(t *testing.T) { @@ -546,7 +553,7 @@ func TestObjectLinks(t *testing.T) { // link to a link _, err = root.AddLink("LALA", infoLA) - expectErr(t, err) + expectErr(t, err, nats.ErrNoLinkToLink) dir, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "DIR"}) expectOk(t, err) @@ -567,6 +574,9 @@ func TestObjectLinks(t *testing.T) { infoBucketLink, err := root.AddBucketLink("dir", dir) expectOk(t, err) + _, err = root.Get(infoBucketLink.Name) + expectErr(t, err, nats.ErrCantGetBucket) + expectLinkPartsAreCorrect(t, infoBucketLink, "DIR", "") // Try to get a linked object, same bucket @@ -608,25 +618,41 @@ func TestObjectLinks(t *testing.T) { // Check simple errors. _, err = root.AddLink("", infoB) - expectErr(t, err) + expectErr(t, err, nats.ErrNameRequired) + + // A is already an object + _, err = root.AddLink("A", infoB) + expectErr(t, err, nats.ErrObjectAlreadyExists) _, err = root.AddLink("Nil Object", nil) - expectErr(t, err) + expectErr(t, err, nats.ErrObjectRequired) infoB.Name = "" _, err = root.AddLink("Empty Info Name", infoB) - expectErr(t, err) + expectErr(t, err, nats.ErrObjectRequired) // Check Error Link to a Link _, err = root.AddLink("Link To Link", infoLB) - expectErr(t, err) + expectErr(t, err, nats.ErrNoLinkToLink) - // Check Error Link to a Link + // Check Errors on bucket linking _, err = root.AddBucketLink("", root) - expectErr(t, err) + expectErr(t, err, nats.ErrNameRequired) _, err = root.AddBucketLink("Nil Bucket", nil) - expectErr(t, err) + expectErr(t, err, nats.ErrBucketRequired) + + err = root.Delete("A") + expectOk(t, err) + + _, err = root.AddLink("ToDeletedStale", infoA) + expectOk(t, err) // TODO deal with this in the code somehow + + infoA, err = root.GetInfo("A") + expectOk(t, err) + + _, err = root.AddLink("ToDeletedFresh", infoA) + expectErr(t, err, nats.ErrNoLinkToDeleted) } func expectLinkIsCorrect(t *testing.T, originalObject *nats.ObjectInfo, linkObject *nats.ObjectInfo) { @@ -686,6 +712,9 @@ func TestObjectList(t *testing.T) { root, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "ROOT"}) expectOk(t, err) + lch, err := root.List() + expectErr(t, err, nats.ErrNoObjectsFound) + put := func(name, value string) { _, err = root.PutString(name, value) expectOk(t, err) @@ -706,7 +735,7 @@ func TestObjectList(t *testing.T) { err = root.Delete("D") expectOk(t, err) - lch, err := root.List() + lch, err = root.List() expectOk(t, err) omap := make(map[string]struct{}) From e5e62fecb9ffca00f027b6f094025f6b4d65173d Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 1 Sep 2022 06:30:12 -0400 Subject: [PATCH 7/7] static check --- test/object_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/object_test.go b/test/object_test.go index a8d321949..5baa861ec 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -35,13 +35,13 @@ func TestObjectBasics(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - obs, err := js.CreateObjectStore(nil) + _, err := js.CreateObjectStore(nil) expectErr(t, err, nats.ErrObjectConfigRequired) - obs, err = js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "notok!", Description: "testing"}) + _, err = js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "notok!", Description: "testing"}) expectErr(t, err, nats.ErrInvalidStoreName) - obs, err = js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"}) + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"}) expectOk(t, err) // Create ~16MB object. @@ -712,7 +712,7 @@ func TestObjectList(t *testing.T) { root, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "ROOT"}) expectOk(t, err) - lch, err := root.List() + _, err = root.List() expectErr(t, err, nats.ErrNoObjectsFound) put := func(name, value string) { @@ -735,7 +735,7 @@ func TestObjectList(t *testing.T) { err = root.Delete("D") expectOk(t, err) - lch, err = root.List() + lch, err := root.List() expectOk(t, err) omap := make(map[string]struct{})