Skip to content

Commit

Permalink
Link not allowed in put meta (#1057)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Sep 1, 2022
1 parent 5d4f44e commit 84e2c74
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 57 deletions.
99 changes: 63 additions & 36 deletions object.go
Expand Up @@ -75,7 +75,7 @@ type ObjectStore interface {

// GetInfo will retrieve the current information for the object.
GetInfo(name string) (*ObjectInfo, error)
// UpdateMeta will update the meta data for the object.
// UpdateMeta will update the metadata for the object.
UpdateMeta(name string, meta *ObjectMeta) error

// Delete will delete the named object.
Expand Down Expand Up @@ -133,6 +133,14 @@ var (
ErrObjectAlreadyExists = errors.New("nats: an object already exists with that name")
ErrNameRequired = errors.New("nats: name is required")
ErrNeeds262 = errors.New("nats: object-store requires at least server version 2.6.2")
ErrLinkNotAllowed = errors.New("nats: link cannot be set when putting the object in bucket")
ErrObjectRequired = errors.New("nats: object required")
ErrNoLinkToDeleted = errors.New("nats: not allowed to link to a deleted object")
ErrNoLinkToLink = errors.New("nats: not allowed to link to another link")
ErrCantGetBucket = errors.New("nats: invalid Get, object is a link to a bucket")
ErrBucketRequired = errors.New("nats: bucket required")
ErrBucketMalformed = errors.New("nats: bucket malformed")
ErrUpdateMetaDeleted = errors.New("nats: cannot update meta for a deleted object")
)

// ObjectStoreConfig is the config for the object store.
Expand Down Expand Up @@ -311,6 +319,14 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
return nil, ErrBadObjectMeta
}

if meta.Opts == nil {
meta.Opts = &ObjectMetaOptions{ChunkSize: objDefaultChunkSize}
} else if meta.Opts.Link != nil {
return nil, ErrLinkNotAllowed
} else if meta.Opts.ChunkSize == 0 {
meta.Opts.ChunkSize = objDefaultChunkSize
}

var o objOpts
for _, opt := range opts {
if opt != nil {
Expand Down Expand Up @@ -356,13 +372,8 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
return nil, err
}

chunkSize := objDefaultChunkSize
if meta.Opts != nil && meta.Opts.ChunkSize > 0 {
chunkSize = meta.Opts.ChunkSize
}

m, h := NewMsg(chunkSubj), sha256.New()
chunk, sent, total := make([]byte, chunkSize), 0, uint64(0)
chunk, sent, total := make([]byte, meta.Opts.ChunkSize), 0, uint64(0)

// set up the info object. The chunk upload sets the size and digest
info := &ObjectInfo{Bucket: obs.name, NUID: newnuid, ObjectMeta: *meta}
Expand Down Expand Up @@ -503,7 +514,7 @@ func (obs *obs) Get(name string, opts ...ObjectOpt) (ObjectResult, error) {
// Check for object links. If single objects we do a pass through.
if info.isLink() {
if info.ObjectMeta.Opts.Link.Name == _EMPTY_ {
return nil, errors.New("nats: object is a link to a bucket")
return nil, ErrCantGetBucket
}

// is the link in the same bucket?
Expand Down Expand Up @@ -613,21 +624,34 @@ func (obs *obs) Delete(name string) error {
info.Deleted = true
info.Size, info.Chunks, info.Digest = 0, 0, _EMPTY_

metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(name))
mm := NewMsg(metaSubj)
mm.Data, err = json.Marshal(info)
if err = publishMeta(info, obs.js); err != nil {
return err
}

// Purge chunks for the object.
chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
}

func publishMeta(info *ObjectInfo, js JetStreamContext) error {
// marshal the object into json, don't store an actual time
info.ModTime = time.Time{}
data, err := json.Marshal(info)
if err != nil {
return err
}

// Prepare and publish the message.
mm := NewMsg(fmt.Sprintf(objMetaPreTmpl, info.Bucket, encodeName(info.ObjectMeta.Name)))
mm.Header.Set(MsgRollup, MsgRollupSubject)
_, err = obs.js.PublishMsg(mm)
if err != nil {
mm.Data = data
if _, err := js.PublishMsg(mm); err != nil {
return err
}

// Purge chunks for the object.
chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
// set the ModTime in case it's returned to the user, even though it's not the correct time.
info.ModTime = time.Now().UTC()
return nil
}

// AddLink will add a link to another object if it's not deleted and not another link
Expand All @@ -637,14 +661,17 @@ func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) {
if name == "" {
return nil, ErrNameRequired
}

// TODO Handle stale info

if obj == nil || obj.Name == "" {
return nil, errors.New("nats: object required")
return nil, ErrObjectRequired
}
if obj.Deleted {
return nil, errors.New("nats: not allowed to link to a deleted object")
return nil, ErrNoLinkToDeleted
}
if obj.isLink() {
return nil, errors.New("nats: not allowed to link to another link")
return nil, ErrNoLinkToLink
}

// If object with link's name is found, error.
Expand All @@ -664,9 +691,14 @@ func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) {
Name: name,
Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: obj.Bucket, Name: obj.Name}},
}
info := &ObjectInfo{Bucket: obs.name, NUID: nuid.Next(), ModTime: time.Now().UTC(), ObjectMeta: *meta}

// put the link object
return obs.Put(meta, nil)
if err = publishMeta(info, obs.js); err != nil {
return nil, err
}

return info, nil
}

// AddBucketLink will add a link to another object store.
Expand All @@ -675,11 +707,11 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro
return nil, ErrNameRequired
}
if bucket == nil {
return nil, errors.New("nats: bucket required")
return nil, ErrBucketRequired
}
bos, ok := bucket.(*obs)
if !ok {
return nil, errors.New("nats: bucket malformed")
return nil, ErrBucketMalformed
}

// If object with link's name is found, error.
Expand All @@ -699,9 +731,15 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro
Name: name,
Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: bos.name}},
}
info := &ObjectInfo{Bucket: ob.name, NUID: nuid.Next(), ObjectMeta: *meta}

// put the link object
return ob.Put(meta, nil)
err = publishMeta(info, ob.js)
if err != nil {
return nil, err
}

return info, nil
}

// PutBytes is convenience function to put a byte slice into this object store.
Expand Down Expand Up @@ -813,12 +851,11 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error {
}

if info.Deleted {
return errors.New("nats: cannot update meta for a deleted object")
return ErrUpdateMetaDeleted
}

// If the new name is different from the old, and it exists, error
// If there was an error that was not ErrObjectNotFound, error.
// sff - Is there a better go way to do this?
if name != meta.Name {
_, err = obs.GetInfo(meta.Name)
if err != ErrObjectNotFound {
Expand All @@ -836,17 +873,7 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error {
info.Headers = meta.Headers

// Prepare the meta message
metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(meta.Name))
mm := NewMsg(metaSubj)
mm.Header.Set(MsgRollup, MsgRollupSubject)
mm.Data, err = json.Marshal(info)
if err != nil {
return err
}

// Publish the meta message.
_, err = obs.js.PublishMsg(mm)
if err != nil {
if err = publishMeta(info, obs.js); err != nil {
return err
}

Expand Down
76 changes: 55 additions & 21 deletions test/object_test.go
Expand Up @@ -35,6 +35,12 @@ func TestObjectBasics(t *testing.T) {
nc, js := jsClient(t, s)
defer nc.Close()

_, err := js.CreateObjectStore(nil)
expectErr(t, err, nats.ErrObjectConfigRequired)

_, err = js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "notok!", Description: "testing"})
expectErr(t, err, nats.ErrInvalidStoreName)

obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"})
expectOk(t, err)

Expand Down Expand Up @@ -102,21 +108,22 @@ func TestObjectBasics(t *testing.T) {
if !bytes.Equal(copy, blob) {
t.Fatalf("Result not the same")
}
// Test delete.
err = js.DeleteObjectStore("OBJS")
expectOk(t, err)
_, err = obs.Get("BLOB")
expectErr(t, err, nats.ErrStreamNotFound)

// Check simple errors.
_, err = obs.Get("FOO")
expectErr(t, err)
expectErr(t, err, nats.ErrObjectNotFound)

_, err = obs.Get("")
expectErr(t, err)
expectErr(t, err, nats.ErrNameRequired)

_, err = obs.PutBytes("", blob)
expectErr(t, err)
expectErr(t, err, nats.ErrBadObjectMeta)

// Test delete.
err = js.DeleteObjectStore("OBJS")
expectOk(t, err)
_, err = obs.Get("BLOB")
expectErr(t, err, nats.ErrStreamNotFound)
}

func TestGetObjectDigestMismatch(t *testing.T) {
Expand Down Expand Up @@ -290,7 +297,7 @@ func TestObjectDeleteMarkers(t *testing.T) {
si, err := js.StreamInfo("OBJ_OBJS")
expectOk(t, err)

// We should have one message left. The delete marker.
// We should have one message left, the "delete" marker.
if si.State.Msgs != 1 {
t.Fatalf("Expected 1 marker msg, got %d msgs", si.State.Msgs)
}
Expand Down Expand Up @@ -368,7 +375,7 @@ func TestObjectNames(t *testing.T) {

// Errors
_, err = obs.PutString("", "A")
expectErr(t, err)
expectErr(t, err, nats.ErrBadObjectMeta)
}

func TestObjectMetadata(t *testing.T) {
Expand Down Expand Up @@ -425,12 +432,17 @@ func TestObjectMetadata(t *testing.T) {
err = obs.Delete("B")
expectOk(t, err)
err = obs.UpdateMeta("B", meta)
expectErr(t, err)
expectErr(t, err, nats.ErrUpdateMetaDeleted)

err = obs.UpdateMeta("X", meta)
if err == nil {
t.Fatal("Expected an error when trying to update an object that does not exist.")
}

// can't have a link when putting an object
meta.Opts = &nats.ObjectMetaOptions{Link: &nats.ObjectLink{Bucket: "DoesntMatter"}}
_, err = obs.Put(meta, nil)
expectErr(t, err, nats.ErrLinkNotAllowed)
}

func TestObjectWatch(t *testing.T) {
Expand All @@ -451,7 +463,7 @@ func TestObjectWatch(t *testing.T) {
t.Helper()
select {
case info := <-watcher.Updates():
if false && info.Name != name {
if false && info.Name != name { // TODO what is supposed to happen here?
t.Fatalf("Expected update for %q, but got %+v", name, info)
}
case <-time.After(time.Second):
Expand Down Expand Up @@ -513,7 +525,7 @@ func TestObjectWatch(t *testing.T) {
meta := &deletedInfo.ObjectMeta
meta.Description = "Making a change."
err = obs.UpdateMeta("A", meta)
expectErr(t, err)
expectErr(t, err, nats.ErrUpdateMetaDeleted)
}

func TestObjectLinks(t *testing.T) {
Expand Down Expand Up @@ -541,7 +553,7 @@ func TestObjectLinks(t *testing.T) {

// link to a link
_, err = root.AddLink("LALA", infoLA)
expectErr(t, err)
expectErr(t, err, nats.ErrNoLinkToLink)

dir, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "DIR"})
expectOk(t, err)
Expand All @@ -562,6 +574,9 @@ func TestObjectLinks(t *testing.T) {
infoBucketLink, err := root.AddBucketLink("dir", dir)
expectOk(t, err)

_, err = root.Get(infoBucketLink.Name)
expectErr(t, err, nats.ErrCantGetBucket)

expectLinkPartsAreCorrect(t, infoBucketLink, "DIR", "")

// Try to get a linked object, same bucket
Expand Down Expand Up @@ -603,25 +618,41 @@ func TestObjectLinks(t *testing.T) {

// Check simple errors.
_, err = root.AddLink("", infoB)
expectErr(t, err)
expectErr(t, err, nats.ErrNameRequired)

// A is already an object
_, err = root.AddLink("A", infoB)
expectErr(t, err, nats.ErrObjectAlreadyExists)

_, err = root.AddLink("Nil Object", nil)
expectErr(t, err)
expectErr(t, err, nats.ErrObjectRequired)

infoB.Name = ""
_, err = root.AddLink("Empty Info Name", infoB)
expectErr(t, err)
expectErr(t, err, nats.ErrObjectRequired)

// Check Error Link to a Link
_, err = root.AddLink("Link To Link", infoLB)
expectErr(t, err)
expectErr(t, err, nats.ErrNoLinkToLink)

// Check Error Link to a Link
// Check Errors on bucket linking
_, err = root.AddBucketLink("", root)
expectErr(t, err)
expectErr(t, err, nats.ErrNameRequired)

_, err = root.AddBucketLink("Nil Bucket", nil)
expectErr(t, err)
expectErr(t, err, nats.ErrBucketRequired)

err = root.Delete("A")
expectOk(t, err)

_, err = root.AddLink("ToDeletedStale", infoA)
expectOk(t, err) // TODO deal with this in the code somehow

infoA, err = root.GetInfo("A")
expectOk(t, err)

_, err = root.AddLink("ToDeletedFresh", infoA)
expectErr(t, err, nats.ErrNoLinkToDeleted)
}

func expectLinkIsCorrect(t *testing.T, originalObject *nats.ObjectInfo, linkObject *nats.ObjectInfo) {
Expand Down Expand Up @@ -681,6 +712,9 @@ func TestObjectList(t *testing.T) {
root, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "ROOT"})
expectOk(t, err)

_, err = root.List()
expectErr(t, err, nats.ErrNoObjectsFound)

put := func(name, value string) {
_, err = root.PutString(name, value)
expectOk(t, err)
Expand Down

0 comments on commit 84e2c74

Please sign in to comment.