Skip to content

Commit

Permalink
Remove public option and move one test to nats package
Browse files Browse the repository at this point in the history
This allows to modify the threshold in the kvs struct for this test

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 15, 2022
1 parent 2aa1139 commit fd44bee
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 64 deletions.
12 changes: 1 addition & 11 deletions kv.go
Expand Up @@ -159,16 +159,6 @@ type KeyValueConfig struct {
MaxBytes int64
Storage StorageType
Replicas int

// This is for the maintenance PurgeDeletes() function. Normally, when
// this function is invoked, keys that have a delete or purge marker
// as the last entry see all their entries removed, including the
// marker. This option allows to delete old data but keep the marker
// if its timestamp is not older than this value. If this option
// is not specified, the API will pick a default of 30 minutes.
// Explicitly set it to a negative value to restore previous behavior
// to delete markers, regardless their age.
PurgeDeletesMarkerThreshold time.Duration
}

// Used to watch all keys.
Expand Down Expand Up @@ -334,7 +324,6 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
stream: scfg.Name,
pre: fmt.Sprintf(kvSubjectsPreTmpl, cfg.Bucket),
js: js,
pdthr: cfg.PurgeDeletesMarkerThreshold,
}
return kv, nil
}
Expand Down Expand Up @@ -560,6 +549,7 @@ func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
defer watcher.Stop()

var limit time.Time
// We are not exposing the option for now, but allow tests to set it...
olderThan := kv.pdthr
// Negative value is used to instruct to always remove markers, regardless
// of age. If set to 0 (or not set), use our default value.
Expand Down
75 changes: 75 additions & 0 deletions kv_test.go
@@ -0,0 +1,75 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nats

import (
"testing"
"time"
)

func expectOk(t *testing.T, err error) {
t.Helper()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}

func TestKeyValuePurgeDeletesMarkerThreshold(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&KeyValueConfig{Bucket: "KVS", History: 10})
expectOk(t, err)

// Override the marker threshold
kv.(*kvs).pdthr = 100 * time.Millisecond

put := func(key, value string) {
t.Helper()
_, err := kv.Put(key, []byte(value))
expectOk(t, err)
}

put("foo", "foo1")
put("bar", "bar1")
put("foo", "foo2")
err = kv.Delete("foo")
expectOk(t, err)

time.Sleep(200 * time.Millisecond)

err = kv.Delete("bar")
expectOk(t, err)

err = kv.PurgeDeletes()
expectOk(t, err)

// The key foo should have been completely cleared of the data
// and the delete marker.
fooEntries, err := kv.History("foo")
if err != ErrKeyNotFound {
t.Fatalf("Expected all entries for key foo to be gone, got err=%v entries=%v", err, fooEntries)
}
barEntries, err := kv.History("bar")
expectOk(t, err)
if len(barEntries) != 1 {
t.Fatalf("Expected 1 entry, got %v", barEntries)
}
if e := barEntries[0]; e.Operation() != KeyValueDelete {
t.Fatalf("Unexpected entry: %+v", e)
}
}
69 changes: 16 additions & 53 deletions test/kv_test.go
Expand Up @@ -400,11 +400,7 @@ func TestKeyValueDeleteTombstones(t *testing.T) {
nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "KVS",
History: 10,
PurgeDeletesMarkerThreshold: -1, // Set negative to remove all markers, regardless of age
})
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 10})
expectOk(t, err)

put := func(key, value string) {
Expand All @@ -429,58 +425,25 @@ func TestKeyValueDeleteTombstones(t *testing.T) {

si, err := js.StreamInfo("KV_KVS")
expectOk(t, err)
if si.State.Msgs != 0 {
// Since tombstones are less than 30 minutes old, there should be 100 messages,
// corresponding to 1 tombstone per key.
if si.State.Msgs != 100 {
t.Fatalf("Expected no stream msgs to be left, got %d", si.State.Msgs)
}
}

func TestKeyValuePurgeDeletesMarkerThreshold(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "KVS",
History: 10,
PurgeDeletesMarkerThreshold: 100 * time.Millisecond,
})
expectOk(t, err)

put := func(key, value string) {
t.Helper()
_, err := kv.Put(key, []byte(value))
expectOk(t, err)
}

put("foo", "foo1")
put("bar", "bar1")
put("foo", "foo2")
err = kv.Delete("foo")
w, err := kv.WatchAll()
expectOk(t, err)

time.Sleep(200 * time.Millisecond)

err = kv.Delete("bar")
expectOk(t, err)

err = kv.PurgeDeletes()
expectOk(t, err)

// The key foo should have been completely cleared of the data
// and the delete marker.
fooEntries, err := kv.History("foo")
if err != nats.ErrKeyNotFound {
t.Fatalf("Expected all entries for key foo to be gone, got err=%v entries=%v", err, fooEntries)
}
barEntries, err := kv.History("bar")
expectOk(t, err)
if len(barEntries) != 1 {
t.Fatalf("Expected 1 entry, got %v", barEntries)
count := 0
for e := range w.Updates() {
if e == nil {
break
}
if e.Operation() != nats.KeyValueDelete {
t.Fatalf("Invalid entry: %+v", e)
}
count++
}
if e := barEntries[0]; e.Operation() != nats.KeyValueDelete {
t.Fatalf("Unexpected entry: %+v", e)
if count != 100 {
t.Fatalf("Expected 100 tombstones, got %v", count)
}
}

Expand Down

0 comments on commit fd44bee

Please sign in to comment.