From a9eec7ac64983ac190a80700b3119935e0690ab4 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 7 Oct 2021 14:37:23 -0700 Subject: [PATCH] Add context awareness to ObjectStore Signed-off-by: Derek Collison --- object.go | 149 ++++++++++++++++++++++++++++++++++++-------- test/object_test.go | 57 +++++++++++++++++ 2 files changed, 181 insertions(+), 25 deletions(-) diff --git a/object.go b/object.go index d1852a8f0..13dd7b280 100644 --- a/object.go +++ b/object.go @@ -15,6 +15,7 @@ package nats import ( "bytes" + "context" "crypto/sha256" "encoding/base64" "encoding/json" @@ -47,24 +48,24 @@ type ObjectStoreManager interface { // This functionality is EXPERIMENTAL and may be changed in later releases. type ObjectStore interface { // Put will place the contents from the reader into a new object. - Put(obj *ObjectMeta, reader io.Reader) (*ObjectInfo, error) + Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) // Get will pull the named object from the object store. - Get(name string) (ObjectResult, error) + Get(name string, opts ...ObjectOpt) (ObjectResult, error) // PutBytes is convenience function to put a byte slice into this object store. - PutBytes(name string, data []byte) (*ObjectInfo, error) + PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error) // GetBytes is a convenience function to pull an object from this object store and return it as a byte slice. - GetBytes(name string) ([]byte, error) + GetBytes(name string, opts ...ObjectOpt) ([]byte, error) // PutBytes is convenience function to put a string into this object store. - PutString(name string, data string) (*ObjectInfo, error) + PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error) // GetString is a convenience function to pull an object from this object store and return it as a string. - GetString(name string) (string, error) + GetString(name string, opts ...ObjectOpt) (string, error) // PutFile is convenience function to put a file into this object store. - PutFile(file string) (*ObjectInfo, error) + PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) // GetFile is a convenience function to pull an object from this object store and place it in a file. - GetFile(name, file string) error + GetFile(name, file string, opts ...ObjectOpt) error // GetInfo will retrieve the current information for the object. GetInfo(name string) (*ObjectInfo, error) @@ -90,6 +91,20 @@ type ObjectStore interface { List(opts ...WatchOpt) ([]*ObjectInfo, error) } +type ObjectOpt interface { + configureObject(opts *objOpts) error +} + +type objOpts struct { + ctx context.Context +} + +// For nats.Context() support. +func (ctx ContextOpt) configureObject(opts *objOpts) error { + opts.ctx = ctx + return nil +} + // ObjectWatcher is what is returned when doing a watch. type ObjectWatcher interface { // Updates returns a channel to read any updates to entries. @@ -244,8 +259,8 @@ func sanitizeName(name string) string { return strings.ReplaceAll(stream, " ", "_") } -// PutObject will place the contents from the reader into a new stream. -func (obs *obs) Put(meta *ObjectMeta, r io.Reader) (*ObjectInfo, error) { +// Put will place the contents from the reader into this object-store. +func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) { if meta == nil { return nil, ErrBadObjectMeta } @@ -255,6 +270,16 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader) (*ObjectInfo, error) { return nil, ErrInvalidObjectName } + var o objOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureObject(&o); err != nil { + return nil, err + } + } + } + ctx := o.ctx + // Grab existing meta info. einfo, err := obs.GetInfo(meta.Name) if err != nil && err != ErrObjectNotFound { @@ -298,6 +323,24 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader) (*ObjectInfo, error) { info := &ObjectInfo{Bucket: obs.name, NUID: id, ObjectMeta: *meta} for r != nil { + if ctx != nil { + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + err = ctx.Err() + } else { + err = ErrTimeout + } + default: + } + if err != nil { + purgePartial() + return nil, err + } + } + + // Actual read. + // TODO(dlc) - Deadline? n, err := r.Read(chunk) // EOF Processing. @@ -377,6 +420,7 @@ type objResult struct { info *ObjectInfo r io.ReadCloser err error + ctx context.Context } func (info *ObjectInfo) isLink() bool { @@ -384,7 +428,7 @@ func (info *ObjectInfo) isLink() bool { } // GetObject will pull the object from the underlying stream. -func (obs *obs) Get(name string) (ObjectResult, error) { +func (obs *obs) Get(name string, opts ...ObjectOpt) (ObjectResult, error) { // Grab meta info. info, err := obs.GetInfo(name) if err != nil { @@ -406,7 +450,17 @@ func (obs *obs) Get(name string) (ObjectResult, error) { return lobs.Get(info.ObjectMeta.Opts.Link.Name) } - result := &objResult{info: info} + var o objOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureObject(&o); err != nil { + return nil, err + } + } + } + ctx := o.ctx + + result := &objResult{info: info, ctx: ctx} if info.Size == 0 { return result, nil } @@ -424,6 +478,22 @@ func (obs *obs) Get(name string) (ObjectResult, error) { h := sha256.New() processChunk := func(m *Msg) { + if ctx != nil { + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + err = ctx.Err() + } else { + err = ErrTimeout + } + default: + } + if err != nil { + gotErr(m, err) + return + } + } + tokens, err := getMetadataFields(m.Reply) if err != nil { gotErr(m, err) @@ -555,13 +625,13 @@ func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, erro } // PutBytes is convenience function to put a byte slice into this object store. -func (obs *obs) PutBytes(name string, data []byte) (*ObjectInfo, error) { - return obs.Put(&ObjectMeta{Name: name}, bytes.NewReader(data)) +func (obs *obs) PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error) { + return obs.Put(&ObjectMeta{Name: name}, bytes.NewReader(data), opts...) } // GetBytes is a convenience function to pull an object from this object store and return it as a byte slice. -func (obs *obs) GetBytes(name string) ([]byte, error) { - result, err := obs.Get(name) +func (obs *obs) GetBytes(name string, opts ...ObjectOpt) ([]byte, error) { + result, err := obs.Get(name, opts...) if err != nil { return nil, err } @@ -575,13 +645,13 @@ func (obs *obs) GetBytes(name string) ([]byte, error) { } // PutBytes is convenience function to put a string into this object store. -func (obs *obs) PutString(name string, data string) (*ObjectInfo, error) { - return obs.Put(&ObjectMeta{Name: name}, strings.NewReader(data)) +func (obs *obs) PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error) { + return obs.Put(&ObjectMeta{Name: name}, strings.NewReader(data), opts...) } // GetString is a convenience function to pull an object from this object store and return it as a string. -func (obs *obs) GetString(name string) (string, error) { - result, err := obs.Get(name) +func (obs *obs) GetString(name string, opts ...ObjectOpt) (string, error) { + result, err := obs.Get(name, opts...) if err != nil { return _EMPTY_, err } @@ -595,17 +665,17 @@ func (obs *obs) GetString(name string) (string, error) { } // PutFile is convenience function to put a file into an object store. -func (obs *obs) PutFile(file string) (*ObjectInfo, error) { +func (obs *obs) PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) { f, err := os.Open(file) if err != nil { return nil, err } defer f.Close() - return obs.Put(&ObjectMeta{Name: file}, f) + return obs.Put(&ObjectMeta{Name: file}, f, opts...) } // GetFile is a convenience function to pull and object and place in a file. -func (obs *obs) GetFile(name, file string) error { +func (obs *obs) GetFile(name, file string, opts ...ObjectOpt) error { // Expect file to be new. f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE, 0600) if err != nil { @@ -613,7 +683,7 @@ func (obs *obs) GetFile(name, file string) error { } defer f.Close() - result, err := obs.Get(name) + result, err := obs.Get(name, opts...) if err != nil { os.Remove(f.Name()) return err @@ -791,13 +861,42 @@ func (obs *obs) List(opts ...WatchOpt) ([]*ObjectInfo, error) { func (o *objResult) Read(p []byte) (n int, err error) { o.Lock() defer o.Unlock() + if ctx := o.ctx; ctx != nil { + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + o.err = ctx.Err() + } else { + o.err = ErrTimeout + } + default: + } + } if o.err != nil { return 0, err } if o.r == nil { return 0, io.EOF } - return o.r.Read(p) + + r := o.r.(net.Conn) + r.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) + n, err = r.Read(p) + if err, ok := err.(net.Error); ok && err.Timeout() { + if ctx := o.ctx; ctx != nil { + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + return 0, ctx.Err() + } else { + return 0, ErrTimeout + } + default: + err = nil + } + } + } + return n, err } // Close impl. diff --git a/test/object_test.go b/test/object_test.go index c1c70684e..7ce770cdd 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -15,7 +15,9 @@ package test import ( "bytes" + "context" "crypto/rand" + "io" "io/ioutil" "os" "path" @@ -537,3 +539,58 @@ func TestObjectList(t *testing.T) { t.Fatalf("Expected %+v but got %+v", expected, omap) } } + +func TestObjectContextOpt(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "OBJS"}) + expectOk(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + time.AfterFunc(100*time.Millisecond, cancel) + + start := time.Now() + _, err = obs.Put(&nats.ObjectMeta{Name: "TEST"}, &slow{1000}, nats.Context(ctx)) + expectErr(t, err) + if delta := time.Since(start); delta > time.Second { + t.Fatalf("Cancel took too long: %v", delta) + } + si, err := js.StreamInfo("OBJ_OBJS") + expectOk(t, err) + if si.State.Msgs != 0 { + t.Fatalf("Expected no messages after canceling put, got %+v", si.State) + } + + // Now put a large object in there. + blob := make([]byte, 8*1024*1024) + rand.Read(blob) + _, err = obs.PutBytes("BLOB", blob) + expectOk(t, err) + + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) + time.AfterFunc(100*time.Millisecond, cancel) + + time.AfterFunc(20*time.Millisecond, func() { shutdown(s) }) + start = time.Now() + _, err = obs.GetBytes("BLOB", nats.Context(ctx)) + expectErr(t, err) + if delta := time.Since(start); delta > time.Second { + t.Fatalf("Cancel took too long: %v", delta) + } +} + +type slow struct{ n int } + +func (sr *slow) Read(p []byte) (n int, err error) { + if sr.n <= 0 { + return 0, io.EOF + } + sr.n-- + time.Sleep(10 * time.Millisecond) + p = append(p, 'A') + return 1, nil +}