Skip to content

Commit

Permalink
Add support for enhanced stream purge in JetStream
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Jun 6, 2022
1 parent ce0cf21 commit 6ca26dd
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 9 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Expand Up @@ -39,4 +39,7 @@ _testmain.go
# bin

# Goland
.idea
.idea

# VS Code
.vscode
7 changes: 7 additions & 0 deletions js.go
Expand Up @@ -228,6 +228,8 @@ type jsOpts struct {
// enables protocol tracing
ctrace ClientTrace
shouldTrace bool
// purgeOpts contains optional stream purge options
purgeOpts *StreamPurgeRequest
}

const (
Expand Down Expand Up @@ -294,6 +296,11 @@ func Domain(domain string) JSOpt {

}

func (s *StreamPurgeRequest) configureJSContext(js *jsOpts) error {
js.purgeOpts = s
return nil
}

// APIPrefix changes the default prefix used for the JetStream API.
func APIPrefix(pre string) JSOpt {
return jsOptFn(func(js *jsOpts) error {
Expand Down
19 changes: 15 additions & 4 deletions jsm.go
Expand Up @@ -925,8 +925,8 @@ func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error {
return nil
}

// streamPurgeRequest is optional request information to the purge API.
type streamPurgeRequest struct {
// StreamPurgeRequest is optional request information to the purge API.
type StreamPurgeRequest struct {
// Purge up to but not including sequence.
Sequence uint64 `json:"seq,omitempty"`
// Subject to match against messages for the purge command.
Expand All @@ -946,10 +946,18 @@ func (js *js) PurgeStream(stream string, opts ...JSOpt) error {
if err := checkStreamName(stream); err != nil {
return err
}
return js.purgeStream(stream, nil)
var req *StreamPurgeRequest
var ok bool
for _, opt := range opts {
// For PurgeStream, only request body opt is relevant
if req, ok = opt.(*StreamPurgeRequest); ok {
break
}
}
return js.purgeStream(stream, req)
}

func (js *js) purgeStream(stream string, req *streamPurgeRequest, opts ...JSOpt) error {
func (js *js) purgeStream(stream string, req *StreamPurgeRequest, opts ...JSOpt) error {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return err
Expand All @@ -975,6 +983,9 @@ func (js *js) purgeStream(stream string, req *streamPurgeRequest, opts ...JSOpt)
return err
}
if resp.Error != nil {
if resp.Error.Code == 400 {
return fmt.Errorf("%w: %s", ErrBadRequest, "invalid purge request body")
}
return errors.New(resp.Error.Description)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion kv.go
Expand Up @@ -717,7 +717,7 @@ func (kv *kvs) PurgeDeletes(opts ...PurgeOpt) error {
}

var (
pr streamPurgeRequest
pr StreamPurgeRequest
b strings.Builder
)
// Do actual purges here.
Expand Down
1 change: 1 addition & 0 deletions nats.go
Expand Up @@ -167,6 +167,7 @@ var (
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrBadRequest = errors.New("nats: bad request")
)

func init() {
Expand Down
6 changes: 3 additions & 3 deletions object.go
Expand Up @@ -336,7 +336,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
return perr
}

purgePartial := func() { obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj}) }
purgePartial := func() { obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}) }

// Create our own JS context to handle errors etc.
js, err := obs.js.nc.JetStream(PublishAsyncErrHandler(func(js JetStream, _ *Msg, err error) { setErr(err) }))
Expand Down Expand Up @@ -439,7 +439,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
// Delete any original one.
if einfo != nil && !einfo.Deleted {
chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, einfo.NUID)
obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj})
obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
}

return info, nil
Expand Down Expand Up @@ -600,7 +600,7 @@ func (obs *obs) Delete(name string) error {

// Purge chunks for the object.
chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
return obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj})
return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
}

// AddLink will add a link to another object into this object store.
Expand Down

0 comments on commit 6ca26dd

Please sign in to comment.