From 2a932dd5250288066d6dc7dbc4cfc634a765e477 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Tue, 2 Aug 2022 11:40:00 +0200 Subject: [PATCH] Add SecureDeleteMsg method to JetStreamManager --- jsm.go | 37 +++++++++++--- test/js_test.go | 131 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+), 7 deletions(-) diff --git a/jsm.go b/jsm.go index faeacabb7..8d8545012 100644 --- a/jsm.go +++ b/jsm.go @@ -58,9 +58,13 @@ type JetStreamManager interface { // The stream must have been created/updated with the AllowDirect boolean. GetLastMsg(name, subject string, opts ...JSOpt) (*RawStreamMsg, error) - // DeleteMsg erases a message from a stream. + // DeleteMsg deletes a message from a stream. The message is marked as erased, but its value is not overwritten. DeleteMsg(name string, seq uint64, opts ...JSOpt) error + // SecureDeleteMsg deletes a message from a stream. The deleted message is overwritten with random data + // As a result, this operation is slower than DeleteMsg() + SecureDeleteMsg(name string, seq uint64, opts ...JSOpt) error + // AddConsumer adds a consumer to a stream. AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) @@ -1012,7 +1016,8 @@ func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error } type msgDeleteRequest struct { - Seq uint64 `json:"seq"` + Seq uint64 `json:"seq"` + NoErase bool `json:"no_erase,omitempty"` } // msgDeleteResponse is the response for a Stream delete request. @@ -1022,6 +1027,7 @@ type msgDeleteResponse struct { } // DeleteMsg deletes a message from a stream. +// The message is marked as erased, but not overwritten func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error { o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { @@ -1031,17 +1037,34 @@ func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error { defer cancel() } - if name == _EMPTY_ { - return ErrStreamNameRequired + return js.deleteMsg(o.ctx, name, &msgDeleteRequest{Seq: seq, NoErase: true}) +} + +// SecureDeleteMsg deletes a message from a stream. The deleted message is overwritten with random data +// As a result, this operation is slower than DeleteMsg() +func (js *js) SecureDeleteMsg(name string, seq uint64, opts ...JSOpt) error { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return err + } + if cancel != nil { + defer cancel() } - req, err := json.Marshal(&msgDeleteRequest{Seq: seq}) + return js.deleteMsg(o.ctx, name, &msgDeleteRequest{Seq: seq}) +} + +func (js *js) deleteMsg(ctx context.Context, stream string, req *msgDeleteRequest) error { + if err := checkStreamName(stream); err != nil { + return err + } + reqJSON, err := json.Marshal(req) if err != nil { return err } - dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, name)) - r, err := js.apiRequestWithContext(o.ctx, dsSubj, req) + dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, stream)) + r, err := js.apiRequestWithContext(ctx, dsSubj, reqJSON) if err != nil { return err } diff --git a/test/js_test.go b/test/js_test.go index 3cd8e4d22..f644d64a3 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2207,10 +2207,141 @@ func TestJetStreamManagement_DeleteMsg(t *testing.T) { } originalSeq := meta.Sequence.Stream + // create a subscription on delete message API subject to verify the content of delete operation + apiSub, err := nc.SubscribeSync("$JS.API.STREAM.MSG.DELETE.foo") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } err = js.DeleteMsg("foo", originalSeq) if err != nil { t.Fatal(err) } + msg, err = apiSub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if str := string(msg.Data); !strings.Contains(str, "no_erase\":true") { + t.Fatalf("Request should not have no_erase field set: %s", str) + } + + si, err = js.StreamInfo("foo") + if err != nil { + t.Fatal(err) + } + total = 14 + if si.State.Msgs != total { + t.Errorf("Expected %d msgs, got: %d", total, si.State.Msgs) + } + + // There should be only 4 messages since one deleted. + expected = 4 + msgs = make([]*nats.Msg, 0) + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + sub, err = js.Subscribe("foo.C", func(msg *nats.Msg) { + msgs = append(msgs, msg) + + if len(msgs) == expected { + cancel() + } + }) + if err != nil { + t.Fatal(err) + } + <-ctx.Done() + sub.Unsubscribe() + + msg = msgs[0] + meta, err = msg.Metadata() + if err != nil { + t.Fatal(err) + } + newSeq := meta.Sequence.Stream + + // First message removed + if newSeq <= originalSeq { + t.Errorf("Expected %d to be higher sequence than %d", newSeq, originalSeq) + } +} + +func TestJetStreamManagement_SecureDeleteMsg(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "foo", + Subjects: []string{"foo.A", "foo.B", "foo.C"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for i := 0; i < 5; i++ { + js.Publish("foo.A", []byte("A")) + js.Publish("foo.B", []byte("B")) + js.Publish("foo.C", []byte("C")) + } + + si, err := js.StreamInfo("foo") + if err != nil { + t.Fatal(err) + } + var total uint64 = 15 + if si.State.Msgs != total { + t.Errorf("Expected %d msgs, got: %d", total, si.State.Msgs) + } + + expected := 5 + msgs := make([]*nats.Msg, 0) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + sub, err := js.Subscribe("foo.C", func(msg *nats.Msg) { + msgs = append(msgs, msg) + if len(msgs) == expected { + cancel() + } + }) + if err != nil { + t.Fatal(err) + } + <-ctx.Done() + sub.Unsubscribe() + + got := len(msgs) + if got != expected { + t.Fatalf("Expected %d, got %d", expected, got) + } + + msg := msgs[0] + meta, err := msg.Metadata() + if err != nil { + t.Fatal(err) + } + originalSeq := meta.Sequence.Stream + + // create a subscription on delete message API subject to verify the content of delete operation + apiSub, err := nc.SubscribeSync("$JS.API.STREAM.MSG.DELETE.foo") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + err = js.SecureDeleteMsg("foo", originalSeq) + if err != nil { + t.Fatal(err) + } + msg, err = apiSub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if str := string(msg.Data); strings.Contains(str, "no_erase\":true") { + t.Fatalf("Request should not have no_erase field set: %s", str) + } si, err = js.StreamInfo("foo") if err != nil {