Skip to content

Commit

Permalink
Add context awareness to ObjectStore
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Oct 7, 2021
1 parent c6d9b8e commit 820b4a3
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 25 deletions.
149 changes: 124 additions & 25 deletions object.go
Expand Up @@ -15,6 +15,7 @@ package nats

import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/json"
Expand Down Expand Up @@ -47,24 +48,24 @@ type ObjectStoreManager interface {
// This functionality is EXPERIMENTAL and may be changed in later releases.
type ObjectStore interface {
// Put will place the contents from the reader into a new object.
Put(obj *ObjectMeta, reader io.Reader) (*ObjectInfo, error)
Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error)
// Get will pull the named object from the object store.
Get(name string) (ObjectResult, error)
Get(name string, opts ...ObjectOpt) (ObjectResult, error)

// PutBytes is convenience function to put a byte slice into this object store.
PutBytes(name string, data []byte) (*ObjectInfo, error)
PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error)
// GetBytes is a convenience function to pull an object from this object store and return it as a byte slice.
GetBytes(name string) ([]byte, error)
GetBytes(name string, opts ...ObjectOpt) ([]byte, error)

// PutBytes is convenience function to put a string into this object store.
PutString(name string, data string) (*ObjectInfo, error)
PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error)
// GetString is a convenience function to pull an object from this object store and return it as a string.
GetString(name string) (string, error)
GetString(name string, opts ...ObjectOpt) (string, error)

// PutFile is convenience function to put a file into this object store.
PutFile(file string) (*ObjectInfo, error)
PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error)
// GetFile is a convenience function to pull an object from this object store and place it in a file.
GetFile(name, file string) error
GetFile(name, file string, opts ...ObjectOpt) error

// GetInfo will retrieve the current information for the object.
GetInfo(name string) (*ObjectInfo, error)
Expand All @@ -90,6 +91,20 @@ type ObjectStore interface {
List(opts ...WatchOpt) ([]*ObjectInfo, error)
}

type ObjectOpt interface {
configureObject(opts *objOpts) error
}

type objOpts struct {
ctx context.Context
}

// For nats.Context() support.
func (ctx ContextOpt) configureObject(opts *objOpts) error {
opts.ctx = ctx
return nil
}

// ObjectWatcher is what is returned when doing a watch.
type ObjectWatcher interface {
// Updates returns a channel to read any updates to entries.
Expand Down Expand Up @@ -244,8 +259,8 @@ func sanitizeName(name string) string {
return strings.ReplaceAll(stream, " ", "_")
}

// PutObject will place the contents from the reader into a new stream.
func (obs *obs) Put(meta *ObjectMeta, r io.Reader) (*ObjectInfo, error) {
// Put will place the contents from the reader into this object-store.
func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) {
if meta == nil {
return nil, ErrBadObjectMeta
}
Expand All @@ -255,6 +270,16 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader) (*ObjectInfo, error) {
return nil, ErrInvalidObjectName
}

var o objOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureObject(&o); err != nil {
return nil, err
}
}
}
ctx := o.ctx

// Grab existing meta info.
einfo, err := obs.GetInfo(meta.Name)
if err != nil && err != ErrObjectNotFound {
Expand Down Expand Up @@ -298,6 +323,24 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader) (*ObjectInfo, error) {
info := &ObjectInfo{Bucket: obs.name, NUID: id, ObjectMeta: *meta}

for r != nil {
if ctx != nil {
select {
case <-ctx.Done():
if ctx.Err() == context.Canceled {
err = ctx.Err()
} else {
err = ErrTimeout
}
default:
}
if err != nil {
purgePartial()
return nil, err
}
}

// Actual read.
// TODO(dlc) - Deadline?
n, err := r.Read(chunk)

// EOF Processing.
Expand Down Expand Up @@ -377,14 +420,15 @@ type objResult struct {
info *ObjectInfo
r io.ReadCloser
err error
ctx context.Context
}

func (info *ObjectInfo) isLink() bool {
return info.ObjectMeta.Opts != nil && info.ObjectMeta.Opts.Link != nil
}

// GetObject will pull the object from the underlying stream.
func (obs *obs) Get(name string) (ObjectResult, error) {
func (obs *obs) Get(name string, opts ...ObjectOpt) (ObjectResult, error) {
// Grab meta info.
info, err := obs.GetInfo(name)
if err != nil {
Expand All @@ -406,7 +450,17 @@ func (obs *obs) Get(name string) (ObjectResult, error) {
return lobs.Get(info.ObjectMeta.Opts.Link.Name)
}

result := &objResult{info: info}
var o objOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configureObject(&o); err != nil {
return nil, err
}
}
}
ctx := o.ctx

result := &objResult{info: info, ctx: ctx}
if info.Size == 0 {
return result, nil
}
Expand All @@ -424,6 +478,22 @@ func (obs *obs) Get(name string) (ObjectResult, error) {
h := sha256.New()

processChunk := func(m *Msg) {
if ctx != nil {
select {
case <-ctx.Done():
if ctx.Err() == context.Canceled {
err = ctx.Err()
} else {
err = ErrTimeout
}
default:
}
if err != nil {
gotErr(m, err)
return
}
}

tokens, err := getMetadataFields(m.Reply)
if err != nil {
gotErr(m, err)
Expand Down Expand Up @@ -555,13 +625,13 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro
}

// PutBytes is convenience function to put a byte slice into this object store.
func (obs *obs) PutBytes(name string, data []byte) (*ObjectInfo, error) {
return obs.Put(&ObjectMeta{Name: name}, bytes.NewReader(data))
func (obs *obs) PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error) {
return obs.Put(&ObjectMeta{Name: name}, bytes.NewReader(data), opts...)
}

// GetBytes is a convenience function to pull an object from this object store and return it as a byte slice.
func (obs *obs) GetBytes(name string) ([]byte, error) {
result, err := obs.Get(name)
func (obs *obs) GetBytes(name string, opts ...ObjectOpt) ([]byte, error) {
result, err := obs.Get(name, opts...)
if err != nil {
return nil, err
}
Expand All @@ -575,13 +645,13 @@ func (obs *obs) GetBytes(name string) ([]byte, error) {
}

// PutBytes is convenience function to put a string into this object store.
func (obs *obs) PutString(name string, data string) (*ObjectInfo, error) {
return obs.Put(&ObjectMeta{Name: name}, strings.NewReader(data))
func (obs *obs) PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error) {
return obs.Put(&ObjectMeta{Name: name}, strings.NewReader(data), opts...)
}

// GetString is a convenience function to pull an object from this object store and return it as a string.
func (obs *obs) GetString(name string) (string, error) {
result, err := obs.Get(name)
func (obs *obs) GetString(name string, opts ...ObjectOpt) (string, error) {
result, err := obs.Get(name, opts...)
if err != nil {
return _EMPTY_, err
}
Expand All @@ -595,25 +665,25 @@ func (obs *obs) GetString(name string) (string, error) {
}

// PutFile is convenience function to put a file into an object store.
func (obs *obs) PutFile(file string) (*ObjectInfo, error) {
func (obs *obs) PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) {
f, err := os.Open(file)
if err != nil {
return nil, err
}
defer f.Close()
return obs.Put(&ObjectMeta{Name: file}, f)
return obs.Put(&ObjectMeta{Name: file}, f, opts...)
}

// GetFile is a convenience function to pull and object and place in a file.
func (obs *obs) GetFile(name, file string) error {
func (obs *obs) GetFile(name, file string, opts ...ObjectOpt) error {
// Expect file to be new.
f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return err
}
defer f.Close()

result, err := obs.Get(name)
result, err := obs.Get(name, opts...)
if err != nil {
os.Remove(f.Name())
return err
Expand Down Expand Up @@ -791,13 +861,42 @@ func (obs *obs) List(opts ...WatchOpt) ([]*ObjectInfo, error) {
func (o *objResult) Read(p []byte) (n int, err error) {
o.Lock()
defer o.Unlock()
if ctx := o.ctx; ctx != nil {
select {
case <-ctx.Done():
if ctx.Err() == context.Canceled {
o.err = ctx.Err()
} else {
o.err = ErrTimeout
}
default:
}
}
if o.err != nil {
return 0, err
}
if o.r == nil {
return 0, io.EOF
}
return o.r.Read(p)

r := o.r.(net.Conn)
r.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
n, err = r.Read(p)
if err, ok := err.(net.Error); ok && err.Timeout() {
if ctx := o.ctx; ctx != nil {
select {
case <-ctx.Done():
if ctx.Err() == context.Canceled {
return 0, ctx.Err()
} else {
return 0, ErrTimeout
}
default:
err = nil
}
}
}
return n, err
}

// Close impl.
Expand Down
57 changes: 57 additions & 0 deletions test/object_test.go
Expand Up @@ -15,7 +15,9 @@ package test

import (
"bytes"
"context"
"crypto/rand"
"io"
"io/ioutil"
"os"
"path"
Expand Down Expand Up @@ -537,3 +539,58 @@ func TestObjectList(t *testing.T) {
t.Fatalf("Expected %+v but got %+v", expected, omap)
}
}

func TestObjectContextOpt(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdown(s)

nc, js := jsClient(t, s)
defer nc.Close()

obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"})
expectOk(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
time.AfterFunc(100*time.Millisecond, cancel)

start := time.Now()
_, err = obs.Put(&nats.ObjectMeta{Name: "TEST"}, &slow{1000}, nats.Context(ctx))
expectErr(t, err)
if delta := time.Since(start); delta > time.Second {
t.Fatalf("Cancel took too long: %v", delta)
}
si, err := js.StreamInfo("OBJ_OBJS")
expectOk(t, err)
if si.State.Msgs != 0 {
t.Fatalf("Expected no messages after canceling put, got %+v", si.State)
}

// Now put a large object in there.
blob := make([]byte, 8*1024*1024)
rand.Read(blob)
_, err = obs.PutBytes("BLOB", blob)
expectOk(t, err)

ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
time.AfterFunc(100*time.Millisecond, cancel)

time.AfterFunc(20*time.Millisecond, func() { shutdown(s) })
start = time.Now()
_, err = obs.GetBytes("BLOB", nats.Context(ctx))
expectErr(t, err)
if delta := time.Since(start); delta > time.Second {
t.Fatalf("Cancel took too long: %v", delta)
}
}

type slow struct{ n int }

func (sr *slow) Read(p []byte) (n int, err error) {
if sr.n <= 0 {
return 0, io.EOF
}
sr.n--
time.Sleep(10 * time.Millisecond)
p[0] = 'A'
return 1, nil
}

0 comments on commit 820b4a3

Please sign in to comment.