Skip to content

Commit

Permalink
Add SecureDeleteMsg method to JetStreamManager
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Aug 2, 2022
1 parent fc77603 commit 91580b7
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 7 deletions.
36 changes: 29 additions & 7 deletions jsm.go
Expand Up @@ -58,9 +58,12 @@ type JetStreamManager interface {
// The stream must have been created/updated with the AllowDirect boolean.
GetLastMsg(name, subject string, opts ...JSOpt) (*RawStreamMsg, error)

// DeleteMsg erases a message from a stream.
// DeleteMsg deletes a message from a stream. The message is marked as erased, but its value is not overwritten.
DeleteMsg(name string, seq uint64, opts ...JSOpt) error

// SecureDeleteMsg deletes a message from a stream. The message value is overwritten with random data.
SecureDeleteMsg(name string, seq uint64, opts ...JSOpt) error

// AddConsumer adds a consumer to a stream.
AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)

Expand Down Expand Up @@ -989,7 +992,8 @@ func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error
}

type msgDeleteRequest struct {
Seq uint64 `json:"seq"`
Seq uint64 `json:"seq"`
NoErase bool `json:"no_erase,omitempty"`
}

// msgDeleteResponse is the response for a Stream delete request.
Expand All @@ -999,6 +1003,7 @@ type msgDeleteResponse struct {
}

// DeleteMsg deletes a message from a stream.
// The message is marked as erased, but not overwritten
func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
Expand All @@ -1008,17 +1013,34 @@ func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error {
defer cancel()
}

if name == _EMPTY_ {
return ErrStreamNameRequired
return js.deleteMsg(o.ctx, name, msgDeleteRequest{Seq: seq, NoErase: true})
}

// SecureDeleteMsg deletes a message from a stream.
// The deleted message is overwritten with random data
func (js *js) SecureDeleteMsg(name string, seq uint64, opts ...JSOpt) error {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return err
}
if cancel != nil {
defer cancel()
}

req, err := json.Marshal(&msgDeleteRequest{Seq: seq})
return js.deleteMsg(o.ctx, name, msgDeleteRequest{Seq: seq})
}

func (js *js) deleteMsg(ctx context.Context, stream string, req msgDeleteRequest) error {
if stream == _EMPTY_ {
return ErrStreamNameRequired
}
reqJSON, err := json.Marshal(req)
if err != nil {
return err
}

dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, name))
r, err := js.apiRequestWithContext(o.ctx, dsSubj, req)
dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, stream))
r, err := js.apiRequestWithContext(ctx, dsSubj, reqJSON)
if err != nil {
return err
}
Expand Down
107 changes: 107 additions & 0 deletions test/js_test.go
Expand Up @@ -2116,6 +2116,113 @@ func TestJetStreamManagement_DeleteMsg(t *testing.T) {
}
}

func TestJetStreamManagement_SecureDeleteMsg(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error

_, err = js.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo.A", "foo.B", "foo.C"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for i := 0; i < 5; i++ {
js.Publish("foo.A", []byte("A"))
js.Publish("foo.B", []byte("B"))
js.Publish("foo.C", []byte("C"))
}

si, err := js.StreamInfo("foo")
if err != nil {
t.Fatal(err)
}
var total uint64 = 15
if si.State.Msgs != total {
t.Errorf("Expected %d msgs, got: %d", total, si.State.Msgs)
}

expected := 5
msgs := make([]*nats.Msg, 0)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

sub, err := js.Subscribe("foo.C", func(msg *nats.Msg) {
msgs = append(msgs, msg)
if len(msgs) == expected {
cancel()
}
})
if err != nil {
t.Fatal(err)
}
<-ctx.Done()
sub.Unsubscribe()

got := len(msgs)
if got != expected {
t.Fatalf("Expected %d, got %d", expected, got)
}

msg := msgs[0]
meta, err := msg.Metadata()
if err != nil {
t.Fatal(err)
}
originalSeq := meta.Sequence.Stream

err = js.SecureDeleteMsg("foo", originalSeq)
if err != nil {
t.Fatal(err)
}

si, err = js.StreamInfo("foo")
if err != nil {
t.Fatal(err)
}
total = 14
if si.State.Msgs != total {
t.Errorf("Expected %d msgs, got: %d", total, si.State.Msgs)
}

// There should be only 4 messages since one deleted.
expected = 4
msgs = make([]*nats.Msg, 0)
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

sub, err = js.Subscribe("foo.C", func(msg *nats.Msg) {
msgs = append(msgs, msg)

if len(msgs) == expected {
cancel()
}
})
if err != nil {
t.Fatal(err)
}
<-ctx.Done()
sub.Unsubscribe()

msg = msgs[0]
meta, err = msg.Metadata()
if err != nil {
t.Fatal(err)
}
newSeq := meta.Sequence.Stream

// First message removed
if newSeq <= originalSeq {
t.Errorf("Expected %d to be higher sequence than %d", newSeq, originalSeq)
}
}

func TestJetStreamImport(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
Expand Down

0 comments on commit 91580b7

Please sign in to comment.