Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

link not allowed in put meta #1057

Merged
merged 7 commits into from Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 49 additions & 9 deletions object.go
Expand Up @@ -75,7 +75,7 @@ type ObjectStore interface {

// GetInfo will retrieve the current information for the object.
GetInfo(name string) (*ObjectInfo, error)
// UpdateMeta will update the meta data for the object.
// UpdateMeta will update the metadata for the object.
UpdateMeta(name string, meta *ObjectMeta) error

// Delete will delete the named object.
Expand Down Expand Up @@ -133,6 +133,7 @@ var (
ErrObjectAlreadyExists = errors.New("nats: an object already exists with that name")
ErrNameRequired = errors.New("nats: name is required")
ErrNeeds262 = errors.New("nats: object-store requires at least server version 2.6.2")
ErrLinkNotAllowed = errors.New("nats: link cannot be set when putting the object in bucket")
)

// ObjectStoreConfig is the config for the object store.
Expand Down Expand Up @@ -311,6 +312,14 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
return nil, ErrBadObjectMeta
}

if meta.Opts == nil {
meta.Opts = &ObjectMetaOptions{ChunkSize: objDefaultChunkSize}
} else if meta.Opts.Link != nil {
return nil, ErrLinkNotAllowed
} else if meta.Opts.ChunkSize == 0 {
meta.Opts.ChunkSize = objDefaultChunkSize
}

var o objOpts
for _, opt := range opts {
if opt != nil {
Expand Down Expand Up @@ -356,13 +365,8 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
return nil, err
}

chunkSize := objDefaultChunkSize
if meta.Opts != nil && meta.Opts.ChunkSize > 0 {
chunkSize = meta.Opts.ChunkSize
}

m, h := NewMsg(chunkSubj), sha256.New()
chunk, sent, total := make([]byte, chunkSize), 0, uint64(0)
chunk, sent, total := make([]byte, meta.Opts.ChunkSize), 0, uint64(0)

// set up the info object. The chunk upload sets the size and digest
info := &ObjectInfo{Bucket: obs.name, NUID: newnuid, ObjectMeta: *meta}
Expand Down Expand Up @@ -630,6 +634,30 @@ func (obs *obs) Delete(name string) error {
return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
}

func publishMeta(info *ObjectInfo, js JetStreamContext) error {
// Prepare the meta message
metaSubj := fmt.Sprintf(objMetaPreTmpl, info.Bucket, encodeName(info.ObjectMeta.Name))
mm := NewMsg(metaSubj)
mm.Header.Set(MsgRollup, MsgRollupSubject)

info.ModTime = time.Time{} // We don't store this, just do it after publish
piotrpio marked this conversation as resolved.
Show resolved Hide resolved
data, err := json.Marshal(info)
if err != nil {
return err
}

// Publish the meta message.
mm.Data = data
_, err = js.PublishMsg(mm)
piotrpio marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

// set the ModTime in case it's returned to the user, even though it's not the correct time.
info.ModTime = time.Now().UTC()
return nil
}

// AddLink will add a link to another object if it's not deleted and not another link
// name is the name of this link object
// obj is what is being linked too
Expand Down Expand Up @@ -664,9 +692,15 @@ func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) {
Name: name,
Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: obj.Bucket, Name: obj.Name}},
}
info := &ObjectInfo{Bucket: obs.name, NUID: nuid.Next(), ModTime: time.Now().UTC(), ObjectMeta: *meta}

// put the link object
return obs.Put(meta, nil)
err = publishMeta(info, obs.js)
piotrpio marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

return info, nil
}

// AddBucketLink will add a link to another object store.
Expand Down Expand Up @@ -699,9 +733,15 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro
Name: name,
Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: bos.name}},
}
info := &ObjectInfo{Bucket: ob.name, NUID: nuid.Next(), ObjectMeta: *meta}

// put the link object
return ob.Put(meta, nil)
err = publishMeta(info, ob.js)
if err != nil {
return nil, err
}

return info, nil
}

// PutBytes is convenience function to put a byte slice into this object store.
Expand Down
5 changes: 5 additions & 0 deletions test/object_test.go
Expand Up @@ -431,6 +431,11 @@ func TestObjectMetadata(t *testing.T) {
if err == nil {
t.Fatal("Expected an error when trying to update an object that does not exist.")
}

// can't have a link when putting an object
meta.Opts = &nats.ObjectMetaOptions{Link: &nats.ObjectLink{Bucket: "DoesntMatter"}}
_, err = obs.Put(meta, nil)
expectErr(t, err, nats.ErrLinkNotAllowed)
}

func TestObjectWatch(t *testing.T) {
Expand Down