Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

link not allowed in put meta #1057

Merged
merged 7 commits into from Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
99 changes: 63 additions & 36 deletions object.go
Expand Up @@ -75,7 +75,7 @@ type ObjectStore interface {

// GetInfo will retrieve the current information for the object.
GetInfo(name string) (*ObjectInfo, error)
// UpdateMeta will update the meta data for the object.
// UpdateMeta will update the metadata for the object.
UpdateMeta(name string, meta *ObjectMeta) error

// Delete will delete the named object.
Expand Down Expand Up @@ -133,6 +133,14 @@ var (
ErrObjectAlreadyExists = errors.New("nats: an object already exists with that name")
ErrNameRequired = errors.New("nats: name is required")
ErrNeeds262 = errors.New("nats: object-store requires at least server version 2.6.2")
ErrLinkNotAllowed = errors.New("nats: link cannot be set when putting the object in bucket")
ErrObjectRequired = errors.New("nats: object required")
ErrNoLinkToDeleted = errors.New("nats: not allowed to link to a deleted object")
ErrNoLinkToLink = errors.New("nats: not allowed to link to another link")
ErrCantGetBucket = errors.New("nats: invalid Get, object is a link to a bucket")
ErrBucketRequired = errors.New("nats: bucket required")
ErrBucketMalformed = errors.New("nats: bucket malformed")
ErrUpdateMetaDeleted = errors.New("nats: cannot update meta for a deleted object")
)

// ObjectStoreConfig is the config for the object store.
Expand Down Expand Up @@ -311,6 +319,14 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
return nil, ErrBadObjectMeta
}

if meta.Opts == nil {
meta.Opts = &ObjectMetaOptions{ChunkSize: objDefaultChunkSize}
} else if meta.Opts.Link != nil {
return nil, ErrLinkNotAllowed
} else if meta.Opts.ChunkSize == 0 {
meta.Opts.ChunkSize = objDefaultChunkSize
}

var o objOpts
for _, opt := range opts {
if opt != nil {
Expand Down Expand Up @@ -356,13 +372,8 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
return nil, err
}

chunkSize := objDefaultChunkSize
if meta.Opts != nil && meta.Opts.ChunkSize > 0 {
chunkSize = meta.Opts.ChunkSize
}

m, h := NewMsg(chunkSubj), sha256.New()
chunk, sent, total := make([]byte, chunkSize), 0, uint64(0)
chunk, sent, total := make([]byte, meta.Opts.ChunkSize), 0, uint64(0)

// set up the info object. The chunk upload sets the size and digest
info := &ObjectInfo{Bucket: obs.name, NUID: newnuid, ObjectMeta: *meta}
Expand Down Expand Up @@ -503,7 +514,7 @@ func (obs *obs) Get(name string, opts ...ObjectOpt) (ObjectResult, error) {
// Check for object links. If single objects we do a pass through.
if info.isLink() {
if info.ObjectMeta.Opts.Link.Name == _EMPTY_ {
return nil, errors.New("nats: object is a link to a bucket")
return nil, ErrCantGetBucket
}

// is the link in the same bucket?
Expand Down Expand Up @@ -613,21 +624,34 @@ func (obs *obs) Delete(name string) error {
info.Deleted = true
info.Size, info.Chunks, info.Digest = 0, 0, _EMPTY_

metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(name))
mm := NewMsg(metaSubj)
mm.Data, err = json.Marshal(info)
if err = publishMeta(info, obs.js); err != nil {
return err
}

// Purge chunks for the object.
chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
}

func publishMeta(info *ObjectInfo, js JetStreamContext) error {
// marshal the object into json, don't store an actual time
info.ModTime = time.Time{}
data, err := json.Marshal(info)
if err != nil {
return err
}

// Prepare and publish the message.
mm := NewMsg(fmt.Sprintf(objMetaPreTmpl, info.Bucket, encodeName(info.ObjectMeta.Name)))
mm.Header.Set(MsgRollup, MsgRollupSubject)
_, err = obs.js.PublishMsg(mm)
if err != nil {
mm.Data = data
if _, err := js.PublishMsg(mm); err != nil {
return err
}

// Purge chunks for the object.
chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
// set the ModTime in case it's returned to the user, even though it's not the correct time.
info.ModTime = time.Now().UTC()
return nil
}

// AddLink will add a link to another object if it's not deleted and not another link
Expand All @@ -637,14 +661,17 @@ func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) {
if name == "" {
return nil, ErrNameRequired
}

// TODO Handle stale info

if obj == nil || obj.Name == "" {
return nil, errors.New("nats: object required")
return nil, ErrObjectRequired
}
if obj.Deleted {
return nil, errors.New("nats: not allowed to link to a deleted object")
return nil, ErrNoLinkToDeleted
}
if obj.isLink() {
return nil, errors.New("nats: not allowed to link to another link")
return nil, ErrNoLinkToLink
}

// If object with link's name is found, error.
Expand All @@ -664,9 +691,14 @@ func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) {
Name: name,
Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: obj.Bucket, Name: obj.Name}},
}
info := &ObjectInfo{Bucket: obs.name, NUID: nuid.Next(), ModTime: time.Now().UTC(), ObjectMeta: *meta}

// put the link object
return obs.Put(meta, nil)
if err = publishMeta(info, obs.js); err != nil {
return nil, err
}

return info, nil
}

// AddBucketLink will add a link to another object store.
Expand All @@ -675,11 +707,11 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro
return nil, ErrNameRequired
}
if bucket == nil {
return nil, errors.New("nats: bucket required")
return nil, ErrBucketRequired
}
bos, ok := bucket.(*obs)
if !ok {
return nil, errors.New("nats: bucket malformed")
return nil, ErrBucketMalformed
}

// If object with link's name is found, error.
Expand All @@ -699,9 +731,15 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro
Name: name,
Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: bos.name}},
}
info := &ObjectInfo{Bucket: ob.name, NUID: nuid.Next(), ObjectMeta: *meta}

// put the link object
return ob.Put(meta, nil)
err = publishMeta(info, ob.js)
if err != nil {
return nil, err
}

return info, nil
}

// PutBytes is convenience function to put a byte slice into this object store.
Expand Down Expand Up @@ -813,12 +851,11 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error {
}

if info.Deleted {
return errors.New("nats: cannot update meta for a deleted object")
return ErrUpdateMetaDeleted
}

// If the new name is different from the old, and it exists, error
// If there was an error that was not ErrObjectNotFound, error.
// sff - Is there a better go way to do this?
if name != meta.Name {
_, err = obs.GetInfo(meta.Name)
if err != ErrObjectNotFound {
Expand All @@ -836,17 +873,7 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error {
info.Headers = meta.Headers

// Prepare the meta message
metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(meta.Name))
mm := NewMsg(metaSubj)
mm.Header.Set(MsgRollup, MsgRollupSubject)
mm.Data, err = json.Marshal(info)
if err != nil {
return err
}

// Publish the meta message.
_, err = obs.js.PublishMsg(mm)
if err != nil {
if err = publishMeta(info, obs.js); err != nil {
return err
}

Expand Down
76 changes: 55 additions & 21 deletions test/object_test.go
Expand Up @@ -35,6 +35,12 @@ func TestObjectBasics(t *testing.T) {
nc, js := jsClient(t, s)
defer nc.Close()

_, err := js.CreateObjectStore(nil)
expectErr(t, err, nats.ErrObjectConfigRequired)

_, err = js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "notok!", Description: "testing"})
expectErr(t, err, nats.ErrInvalidStoreName)

obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS", Description: "testing"})
expectOk(t, err)

Expand Down Expand Up @@ -102,21 +108,22 @@ func TestObjectBasics(t *testing.T) {
if !bytes.Equal(copy, blob) {
t.Fatalf("Result not the same")
}
// Test delete.
err = js.DeleteObjectStore("OBJS")
expectOk(t, err)
_, err = obs.Get("BLOB")
expectErr(t, err, nats.ErrStreamNotFound)

// Check simple errors.
_, err = obs.Get("FOO")
expectErr(t, err)
expectErr(t, err, nats.ErrObjectNotFound)

_, err = obs.Get("")
expectErr(t, err)
expectErr(t, err, nats.ErrNameRequired)

_, err = obs.PutBytes("", blob)
expectErr(t, err)
expectErr(t, err, nats.ErrBadObjectMeta)

// Test delete.
err = js.DeleteObjectStore("OBJS")
expectOk(t, err)
_, err = obs.Get("BLOB")
expectErr(t, err, nats.ErrStreamNotFound)
}

func TestGetObjectDigestMismatch(t *testing.T) {
Expand Down Expand Up @@ -290,7 +297,7 @@ func TestObjectDeleteMarkers(t *testing.T) {
si, err := js.StreamInfo("OBJ_OBJS")
expectOk(t, err)

// We should have one message left. The delete marker.
// We should have one message left, the "delete" marker.
if si.State.Msgs != 1 {
t.Fatalf("Expected 1 marker msg, got %d msgs", si.State.Msgs)
}
Expand Down Expand Up @@ -368,7 +375,7 @@ func TestObjectNames(t *testing.T) {

// Errors
_, err = obs.PutString("", "A")
expectErr(t, err)
expectErr(t, err, nats.ErrBadObjectMeta)
}

func TestObjectMetadata(t *testing.T) {
Expand Down Expand Up @@ -425,12 +432,17 @@ func TestObjectMetadata(t *testing.T) {
err = obs.Delete("B")
expectOk(t, err)
err = obs.UpdateMeta("B", meta)
expectErr(t, err)
expectErr(t, err, nats.ErrUpdateMetaDeleted)

err = obs.UpdateMeta("X", meta)
if err == nil {
t.Fatal("Expected an error when trying to update an object that does not exist.")
}

// can't have a link when putting an object
meta.Opts = &nats.ObjectMetaOptions{Link: &nats.ObjectLink{Bucket: "DoesntMatter"}}
_, err = obs.Put(meta, nil)
expectErr(t, err, nats.ErrLinkNotAllowed)
}

func TestObjectWatch(t *testing.T) {
Expand All @@ -451,7 +463,7 @@ func TestObjectWatch(t *testing.T) {
t.Helper()
select {
case info := <-watcher.Updates():
if false && info.Name != name {
if false && info.Name != name { // TODO what is supposed to happen here?
t.Fatalf("Expected update for %q, but got %+v", name, info)
}
case <-time.After(time.Second):
Expand Down Expand Up @@ -513,7 +525,7 @@ func TestObjectWatch(t *testing.T) {
meta := &deletedInfo.ObjectMeta
meta.Description = "Making a change."
err = obs.UpdateMeta("A", meta)
expectErr(t, err)
expectErr(t, err, nats.ErrUpdateMetaDeleted)
}

func TestObjectLinks(t *testing.T) {
Expand Down Expand Up @@ -541,7 +553,7 @@ func TestObjectLinks(t *testing.T) {

// link to a link
_, err = root.AddLink("LALA", infoLA)
expectErr(t, err)
expectErr(t, err, nats.ErrNoLinkToLink)

dir, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "DIR"})
expectOk(t, err)
Expand All @@ -562,6 +574,9 @@ func TestObjectLinks(t *testing.T) {
infoBucketLink, err := root.AddBucketLink("dir", dir)
expectOk(t, err)

_, err = root.Get(infoBucketLink.Name)
expectErr(t, err, nats.ErrCantGetBucket)

expectLinkPartsAreCorrect(t, infoBucketLink, "DIR", "")

// Try to get a linked object, same bucket
Expand Down Expand Up @@ -603,25 +618,41 @@ func TestObjectLinks(t *testing.T) {

// Check simple errors.
_, err = root.AddLink("", infoB)
expectErr(t, err)
expectErr(t, err, nats.ErrNameRequired)

// A is already an object
_, err = root.AddLink("A", infoB)
expectErr(t, err, nats.ErrObjectAlreadyExists)

_, err = root.AddLink("Nil Object", nil)
expectErr(t, err)
expectErr(t, err, nats.ErrObjectRequired)

infoB.Name = ""
_, err = root.AddLink("Empty Info Name", infoB)
expectErr(t, err)
expectErr(t, err, nats.ErrObjectRequired)

// Check Error Link to a Link
_, err = root.AddLink("Link To Link", infoLB)
expectErr(t, err)
expectErr(t, err, nats.ErrNoLinkToLink)

// Check Error Link to a Link
// Check Errors on bucket linking
_, err = root.AddBucketLink("", root)
expectErr(t, err)
expectErr(t, err, nats.ErrNameRequired)

_, err = root.AddBucketLink("Nil Bucket", nil)
expectErr(t, err)
expectErr(t, err, nats.ErrBucketRequired)

err = root.Delete("A")
expectOk(t, err)

_, err = root.AddLink("ToDeletedStale", infoA)
expectOk(t, err) // TODO deal with this in the code somehow

infoA, err = root.GetInfo("A")
expectOk(t, err)

_, err = root.AddLink("ToDeletedFresh", infoA)
expectErr(t, err, nats.ErrNoLinkToDeleted)
}

func expectLinkIsCorrect(t *testing.T, originalObject *nats.ObjectInfo, linkObject *nats.ObjectInfo) {
Expand Down Expand Up @@ -681,6 +712,9 @@ func TestObjectList(t *testing.T) {
root, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "ROOT"})
expectOk(t, err)

_, err = root.List()
expectErr(t, err, nats.ErrNoObjectsFound)

put := func(name, value string) {
_, err = root.PutString(name, value)
expectOk(t, err)
Expand Down