Skip to content

Commit

Permalink
[CHANGED] Hide deleted objects by default (#1091)
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Sep 21, 2022
1 parent 4832b65 commit f410242
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 62 deletions.
201 changes: 163 additions & 38 deletions object.go
Expand Up @@ -60,25 +60,25 @@ type ObjectStore interface {
// Put will place the contents from the reader into a new object.
Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error)
// Get will pull the named object from the object store.
Get(name string, opts ...ObjectOpt) (ObjectResult, error)
Get(name string, opts ...GetObjectOpt) (ObjectResult, error)

// PutBytes is convenience function to put a byte slice into this object store.
PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error)
// GetBytes is a convenience function to pull an object from this object store and return it as a byte slice.
GetBytes(name string, opts ...ObjectOpt) ([]byte, error)
GetBytes(name string, opts ...GetObjectOpt) ([]byte, error)

// PutString is convenience function to put a string into this object store.
PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error)
// GetString is a convenience function to pull an object from this object store and return it as a string.
GetString(name string, opts ...ObjectOpt) (string, error)
GetString(name string, opts ...GetObjectOpt) (string, error)

// PutFile is convenience function to put a file into this object store.
PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error)
// GetFile is a convenience function to pull an object from this object store and place it in a file.
GetFile(name, file string, opts ...ObjectOpt) error
GetFile(name, file string, opts ...GetObjectOpt) error

// GetInfo will retrieve the current information for the object.
GetInfo(name string) (*ObjectInfo, error)
GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error)
// UpdateMeta will update the metadata for the object.
UpdateMeta(name string, meta *ObjectMeta) error

Expand All @@ -98,7 +98,7 @@ type ObjectStore interface {
Watch(opts ...WatchOpt) (ObjectWatcher, error)

// List will list all the objects in this store.
List(opts ...WatchOpt) ([]*ObjectInfo, error)
List(opts ...ListObjectsOpt) ([]*ObjectInfo, error)

// Status retrieves run-time status about the backing store of the bucket.
Status() (ObjectStoreStatus, error)
Expand Down Expand Up @@ -349,7 +349,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn

// Grab existing meta info (einfo). Ok to be found or not found, any other error is a problem
// Chunks on the old nuid can be cleaned up at the end
einfo, err := obs.GetInfo(meta.Name) // GetInfo will encode the name
einfo, err := obs.GetInfo(meta.Name, GetObjectInfoShowDeleted()) // GetInfo will encode the name
if err != nil && err != ErrObjectNotFound {
return nil, err
}
Expand Down Expand Up @@ -517,10 +517,56 @@ func (info *ObjectInfo) isLink() bool {
return info.ObjectMeta.Opts != nil && info.ObjectMeta.Opts.Link != nil
}

type GetObjectOpt interface {
configureGetObject(opts *getObjectOpts) error
}
type getObjectOpts struct {
ctx context.Context
// Include deleted object in the result.
showDeleted bool
}

type getObjectFn func(opts *getObjectOpts) error

func (opt getObjectFn) configureGetObject(opts *getObjectOpts) error {
return opt(opts)
}

// GetObjectShowDeleted makes Get() return object if it was marked as deleted.
func GetObjectShowDeleted() GetObjectOpt {
return getObjectFn(func(opts *getObjectOpts) error {
opts.showDeleted = true
return nil
})
}

// For nats.Context() support.
func (ctx ContextOpt) configureGetObject(opts *getObjectOpts) error {
opts.ctx = ctx
return nil
}

// Get will pull the object from the underlying stream.
func (obs *obs) Get(name string, opts ...ObjectOpt) (ObjectResult, error) {
func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
var o getObjectOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureGetObject(&o); err != nil {
return nil, err
}
}
}
ctx := o.ctx
infoOpts := make([]GetObjectInfoOpt, 0)
if ctx != nil {
infoOpts = append(infoOpts, Context(ctx))
}
if o.showDeleted {
infoOpts = append(infoOpts, GetObjectInfoShowDeleted())
}

// Grab meta info.
info, err := obs.GetInfo(name)
info, err := obs.GetInfo(name, infoOpts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -548,16 +594,6 @@ func (obs *obs) Get(name string, opts ...ObjectOpt) (ObjectResult, error) {
return lobs.Get(info.ObjectMeta.Opts.Link.Name)
}

var o objOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureObject(&o); err != nil {
return nil, err
}
}
}
ctx := o.ctx

result := &objResult{info: info, ctx: ctx}
if info.Size == 0 {
return result, nil
Expand Down Expand Up @@ -629,7 +665,7 @@ func (obs *obs) Get(name string, opts ...ObjectOpt) (ObjectResult, error) {
// Delete will delete the object.
func (obs *obs) Delete(name string) error {
// Grab meta info.
info, err := obs.GetInfo(name)
info, err := obs.GetInfo(name, GetObjectInfoShowDeleted())
if err != nil {
return err
}
Expand Down Expand Up @@ -694,7 +730,7 @@ func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) {
// If object with link's name is found, error.
// If link with link's name is found, that's okay to overwrite.
// If there was an error that was not ErrObjectNotFound, error.
einfo, err := obs.GetInfo(name)
einfo, err := obs.GetInfo(name, GetObjectInfoShowDeleted())
if einfo != nil {
if !einfo.isLink() {
return nil, ErrObjectAlreadyExists
Expand Down Expand Up @@ -734,7 +770,7 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro
// If object with link's name is found, error.
// If link with link's name is found, that's okay to overwrite.
// If there was an error that was not ErrObjectNotFound, error.
einfo, err := ob.GetInfo(name)
einfo, err := ob.GetInfo(name, GetObjectInfoShowDeleted())
if einfo != nil {
if !einfo.isLink() {
return nil, ErrObjectAlreadyExists
Expand Down Expand Up @@ -765,7 +801,7 @@ func (obs *obs) PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectIn
}

// GetBytes is a convenience function to pull an object from this object store and return it as a byte slice.
func (obs *obs) GetBytes(name string, opts ...ObjectOpt) ([]byte, error) {
func (obs *obs) GetBytes(name string, opts ...GetObjectOpt) ([]byte, error) {
result, err := obs.Get(name, opts...)
if err != nil {
return nil, err
Expand All @@ -785,7 +821,7 @@ func (obs *obs) PutString(name string, data string, opts ...ObjectOpt) (*ObjectI
}

// GetString is a convenience function to pull an object from this object store and return it as a string.
func (obs *obs) GetString(name string, opts ...ObjectOpt) (string, error) {
func (obs *obs) GetString(name string, opts ...GetObjectOpt) (string, error) {
result, err := obs.Get(name, opts...)
if err != nil {
return _EMPTY_, err
Expand All @@ -810,7 +846,7 @@ func (obs *obs) PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) {
}

// GetFile is a convenience function to pull and object and place in a file.
func (obs *obs) GetFile(name, file string, opts ...ObjectOpt) error {
func (obs *obs) GetFile(name, file string, opts ...GetObjectOpt) error {
// Expect file to be new.
f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
Expand All @@ -830,12 +866,49 @@ func (obs *obs) GetFile(name, file string, opts ...ObjectOpt) error {
return err
}

type GetObjectInfoOpt interface {
configureGetInfo(opts *getObjectInfoOpts) error
}
type getObjectInfoOpts struct {
ctx context.Context
// Include deleted object in the result.
showDeleted bool
}

type getObjectInfoFn func(opts *getObjectInfoOpts) error

func (opt getObjectInfoFn) configureGetInfo(opts *getObjectInfoOpts) error {
return opt(opts)
}

// GetObjectInfoShowDeleted makes GetInfo() return object if it was marked as deleted.
func GetObjectInfoShowDeleted() GetObjectInfoOpt {
return getObjectInfoFn(func(opts *getObjectInfoOpts) error {
opts.showDeleted = true
return nil
})
}

// For nats.Context() support.
func (ctx ContextOpt) configureGetInfo(opts *getObjectInfoOpts) error {
opts.ctx = ctx
return nil
}

// GetInfo will retrieve the current information for the object.
func (obs *obs) GetInfo(name string) (*ObjectInfo, error) {
func (obs *obs) GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error) {
// Grab last meta value we have.
if name == "" {
return nil, ErrNameRequired
}
var o getObjectInfoOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureGetInfo(&o); err != nil {
return nil, err
}
}
}

metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, encodeName(name)) // used as data in a JS API call
stream := fmt.Sprintf(objNameTmpl, obs.name)
Expand All @@ -851,6 +924,9 @@ func (obs *obs) GetInfo(name string) (*ObjectInfo, error) {
if err := json.Unmarshal(m.Data, &info); err != nil {
return nil, ErrBadObjectMeta
}
if !o.showDeleted && info.Deleted {
return nil, ErrObjectNotFound
}
info.ModTime = m.Time
return &info, nil
}
Expand All @@ -864,17 +940,16 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error {
// Grab the current meta.
info, err := obs.GetInfo(name)
if err != nil {
if errors.Is(err, ErrObjectNotFound) {
return ErrUpdateMetaDeleted
}
return err
}

if info.Deleted {
return ErrUpdateMetaDeleted
}

// If the new name is different from the old, and it exists, error
// If there was an error that was not ErrObjectNotFound, error.
if name != meta.Name {
existingInfo, err := obs.GetInfo(meta.Name)
existingInfo, err := obs.GetInfo(meta.Name, GetObjectInfoShowDeleted())
if err != nil && !errors.Is(err, ErrObjectNotFound) {
return err
}
Expand Down Expand Up @@ -996,21 +1071,71 @@ func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) {
return w, nil
}

type ListObjectsOpt interface {
configureListObjects(opts *listObjectOpts) error
}
type listObjectOpts struct {
ctx context.Context
// Include deleted objects in the result channel.
showDeleted bool
}

type listObjectsFn func(opts *listObjectOpts) error

func (opt listObjectsFn) configureListObjects(opts *listObjectOpts) error {
return opt(opts)
}

// ListObjectsShowDeleted makes ListObjects() return deleted objects.
func ListObjectsShowDeleted() ListObjectsOpt {
return listObjectsFn(func(opts *listObjectOpts) error {
opts.showDeleted = true
return nil
})
}

// For nats.Context() support.
func (ctx ContextOpt) configureListObjects(opts *listObjectOpts) error {
opts.ctx = ctx
return nil
}

// List will list all the objects in this store.
func (obs *obs) List(opts ...WatchOpt) ([]*ObjectInfo, error) {
opts = append(opts, IgnoreDeletes())
watcher, err := obs.Watch(opts...)
func (obs *obs) List(opts ...ListObjectsOpt) ([]*ObjectInfo, error) {
var o listObjectOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureListObjects(&o); err != nil {
return nil, err
}
}
}
watchOpts := make([]WatchOpt, 0)
if !o.showDeleted {
watchOpts = append(watchOpts, IgnoreDeletes())
}
watcher, err := obs.Watch(watchOpts...)
if err != nil {
return nil, err
}
defer watcher.Stop()
if o.ctx == nil {
o.ctx = context.Background()
}

var objs []*ObjectInfo
for entry := range watcher.Updates() {
if entry == nil {
break
updates := watcher.Updates()
Updates:
for {
select {
case entry := <-updates:
if entry == nil {
break Updates
}
objs = append(objs, entry)
case <-o.ctx.Done():
return nil, o.ctx.Err()
}
objs = append(objs, entry)
}
if len(objs) == 0 {
return nil, ErrNoObjectsFound
Expand Down

0 comments on commit f410242

Please sign in to comment.