diff --git a/go.mod b/go.mod index e2834ece..1e0b369a 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/klauspost/compress v1.13.4 github.com/nats-io/jsm.go v0.0.27-0.20211006163108-9aae04fb57e9 github.com/nats-io/nats-server/v2 v2.6.2-0.20211006145508-3f12216fcc34 - github.com/nats-io/nats.go v1.12.3 + github.com/nats-io/nats.go v1.13.0 github.com/nats-io/nuid v1.0.1 github.com/prometheus/client_golang v1.11.0 github.com/prometheus/common v0.26.0 diff --git a/go.sum b/go.sum index 266ffd38..ab76ceb0 100644 --- a/go.sum +++ b/go.sum @@ -118,8 +118,9 @@ github.com/nats-io/jwt/v2 v2.1.0 h1:1UbfD5g1xTdWmSeRV8bh/7u+utTiBsRtWhLl1PixZp4= github.com/nats-io/jwt/v2 v2.1.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats-server/v2 v2.6.2-0.20211006145508-3f12216fcc34 h1:Qq2jwQrv/hyU6MmYZR5NYJ6wKVwRULJpn0CoDMtMUZg= github.com/nats-io/nats-server/v2 v2.6.2-0.20211006145508-3f12216fcc34/go.mod h1:ubcDOPViqaQcNvJVzoX9FIDxAxyJDTItw07lqFCzC80= -github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E= github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE= +github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/nats/kv_command.go b/nats/kv_command.go index 9105bde6..4ae4dd43 100644 --- a/nats/kv_command.go +++ b/nats/kv_command.go @@ -1,8 +1,6 @@ package main import ( - "context" - "encoding/json" "fmt" "io/ioutil" "os" @@ -10,7 +8,6 @@ import ( "github.com/dustin/go-humanize" "github.com/fatih/color" - "github.com/nats-io/jsm.go/kv" "github.com/nats-io/nats.go" "gopkg.in/alecthomas/kingpin.v2" ) @@ -20,7 +17,6 @@ type kvCommand struct { key string val string raw bool - asJson bool history uint64 ttl time.Duration replicas uint @@ -28,6 +24,7 @@ type kvCommand struct { maxValueSize int32 maxBucketSize int64 cluster string + revision uint64 } func configureKVCommand(app *kingpin.Application) { @@ -38,8 +35,7 @@ func configureKVCommand(app *kingpin.Application) { The JetStream Key-Value store uses streams to store key-value pairs for an indefinite period or a per-bucket configured TTL. -The Key-Value store supports read-after-write safety when not using -any caches or read replicas. +The Key-Value store supports read-after-write safety. NOTE: This is an experimental feature. ` @@ -56,6 +52,17 @@ NOTE: This is an experimental feature. put.Arg("key", "The key to act on").Required().StringVar(&c.key) put.Arg("value", "The value to store, when empty reads STDIN").StringVar(&c.val) + create := kv.Command("create", "Puts a value into a key only if the key is new or it's last operation was a delete").Action(c.createAction) + create.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + create.Arg("key", "The key to act on").Required().StringVar(&c.key) + create.Arg("value", "The value to store, when empty reads STDIN").StringVar(&c.val) + + update := kv.Command("update", "Updates a key with a new value if the previous value matches the given revision").Action(c.updateAction) + update.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + update.Arg("key", "The key to act on").Required().StringVar(&c.key) + update.Arg("value", "The value to store, when empty reads STDIN").StringVar(&c.val) + update.Arg("revision", "The revision of the previous value in the bucket").Uint64Var(&c.revision) + del := kv.Command("del", "Deletes a key from the bucket, preserving history").Action(c.deleteAction) del.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) del.Arg("key", "The key to act on").Required().StringVar(&c.key) @@ -69,7 +76,6 @@ NOTE: This is an experimental feature. history := kv.Command("history", "Shows the full history for a key").Action(c.historyAction) history.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) history.Arg("key", "The key to act on").Required().StringVar(&c.key) - history.Flag("json", "JSON format output").Short('j').BoolVar(&c.asJson) add := kv.Command("add", "Adds a new KV store").Alias("new").Action(c.addAction) add.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) @@ -87,13 +93,14 @@ NOTE: This is an experimental feature. watch.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) watch.Arg("key", "The key to act on").StringVar(&c.key) - dump := kv.Command("dump", "Dumps the contents of the bucket as JSON").Action(c.dumpAction) - dump.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) - rm := kv.Command("rm", "Removes a bucket").Action(c.rmAction) rm.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) rm.Flag("force", "Act without confirmation").Short('f').BoolVar(&c.force) + rmHistory := kv.Command("compact", "Removes all historic values from the store where the last value is a delete").Action(c.compactAction) + rmHistory.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket) + rmHistory.Flag("force", "Act without confirmation").Short('f').BoolVar(&c.force) + cheats["kv"] = `# to create a replicated KV bucket nats kv add CONFIG --replicas 3 @@ -103,39 +110,44 @@ nats kv put CONFIG username bob # to read just the value with no additional details nats kv get CONFIG username --raw -# to see all values in the bucket -nats kv dump CONFIG - # to see the bucket status nats kv status CONFIG ` } +func (c *kvCommand) strForOp(op nats.KeyValueOp) string { + switch op { + case nats.KeyValuePut: + return "PUT" + case nats.KeyValuePurge: + return "PURGE" + case nats.KeyValueDelete: + return "DELETE" + default: + return "UNKNOWN" + } +} + func (c *kvCommand) historyAction(_ *kingpin.ParseContext) error { _, store, err := c.loadBucket() if err != nil { return err } - history, err := store.History(context.Background(), c.key) + history, err := store.History(c.key) if err != nil { return err } - if c.asJson { - printJSON(history) - return nil - } - table := newTableWriter(fmt.Sprintf("History for %s > %s", c.bucket, c.key)) - table.AddHeaders("Seq", "Op", "Created", "Length", "Value") + table.AddHeaders("Key", "Revision", "Op", "Created", "Length", "Value") for _, r := range history { val := base64IfNotPrintable(r.Value()) if len(val) > 40 { val = fmt.Sprintf("%s...%s", val[0:15], val[len(val)-15:]) } - table.AddRow(r.Sequence(), r.Operation(), r.Created().Format(time.RFC822), humanize.Comma(int64(len(r.Value()))), val) + table.AddRow(r.Key(), r.Revision(), c.strForOp(r.Operation()), r.Created().Format(time.RFC822), humanize.Comma(int64(len(r.Value()))), val) } fmt.Println(table.Render()) @@ -143,6 +155,27 @@ func (c *kvCommand) historyAction(_ *kingpin.ParseContext) error { return nil } +func (c *kvCommand) compactAction(_ *kingpin.ParseContext) error { + _, store, err := c.loadBucket() + if err != nil { + return err + } + + if !c.force { + ok, err := askConfirmation(fmt.Sprintf("Purge all historic values and audit trails for deleted keys in bucket %s?", c.bucket), false) + if err != nil { + return err + } + + if !ok { + fmt.Println("Skipping delete") + return nil + } + } + + return store.PurgeDeletes() +} + func (c *kvCommand) deleteAction(_ *kingpin.ParseContext) error { _, store, err := c.loadBucket() if err != nil { @@ -170,19 +203,25 @@ func (c *kvCommand) addAction(_ *kingpin.ParseContext) error { return err } - store, err := kv.NewBucket(nc, c.bucket, - kv.WithTTL(c.ttl), - kv.WithHistory(c.history), - kv.WithReplicas(c.replicas), - kv.WithPlacementCluster(c.cluster), - kv.WithMaxBucketSize(c.maxBucketSize), - kv.WithMaxValueSize(c.maxValueSize), - ) + js, err := nc.JetStream() if err != nil { return err } - return c.showStatus(store) + store, err := js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: c.bucket, + MaxValueSize: c.maxValueSize, + History: uint8(c.history), + TTL: c.ttl, + MaxBytes: c.maxBucketSize, + Storage: nats.FileStorage, // TODO + Replicas: int(c.replicas), + }) + if err != nil { + return err + } + + return c.showStatus(store, js) } func (c *kvCommand) getAction(_ *kingpin.ParseContext) error { @@ -223,16 +262,12 @@ func (c *kvCommand) putAction(_ *kingpin.ParseContext) error { return err } - if c.val == "" { - var val []byte - val, err = ioutil.ReadAll(os.Stdin) - if err != nil { - return err - } - _, err = store.Put(c.key, val) - } else { - _, err = store.Put(c.key, []byte(c.val)) + val, err := c.valOrReadVal() + if err != nil { + return err } + + _, err = store.Put(c.key, val) if err != nil { return err } @@ -242,89 +277,122 @@ func (c *kvCommand) putAction(_ *kingpin.ParseContext) error { return err } -func (c *kvCommand) loadBucket() (*nats.Conn, kv.KV, error) { - nc, _, err := prepareHelper("", natsOpts()...) +func (c *kvCommand) createAction(_ *kingpin.ParseContext) error { + _, store, err := c.loadBucket() if err != nil { - return nil, nil, err + return err } - store, err := kv.NewClient(nc, c.bucket) + val, err := c.valOrReadVal() if err != nil { - return nil, nil, err + return err } - return nc, store, err -} - -func (c *kvCommand) statusAction(_ *kingpin.ParseContext) error { - _, store, err := c.loadBucket() + _, err = store.Create(c.key, val) if err != nil { return err } - return c.showStatus(store) + fmt.Println(c.val) + + return err } -func (c *kvCommand) watchAction(_ *kingpin.ParseContext) error { +func (c *kvCommand) updateAction(_ *kingpin.ParseContext) error { _, store, err := c.loadBucket() if err != nil { return err } - watch, err := store.Watch(context.Background(), c.key) + val, err := c.valOrReadVal() if err != nil { return err } - defer watch.Close() - for res := range watch.Channel() { - if res != nil { - if res.Operation() == kv.DeleteOperation { - fmt.Printf("[%s] %s %s > %s\n", res.Created().Format("2006-01-02 15:04:05"), color.RedString("DEL"), res.Bucket(), res.Key()) - } else { - fmt.Printf("[%s] %s %s > %s: %s\n", res.Created().Format("2006-01-02 15:04:05"), color.GreenString("PUT"), res.Bucket(), res.Key(), res.Value()) - } - } + _, err = store.Update(c.key, val, c.revision) + if err != nil { + return err } - return nil + fmt.Println(c.val) + + return err } -func (c *kvCommand) dumpAction(_ *kingpin.ParseContext) error { - _, store, err := c.loadBucket() +func (c *kvCommand) valOrReadVal() ([]byte, error) { + if c.val != "" { + return []byte(c.val), nil + } + + return ioutil.ReadAll(os.Stdin) +} + +func (c *kvCommand) loadBucket() (*nats.Conn, nats.KeyValue, error) { + nc, _, err := prepareHelper("", natsOpts()...) if err != nil { - return err + return nil, nil, err + } + + var opts []nats.JSOpt + + if jsDomain != "" { + opts = append(opts, nats.Domain(jsDomain)) + } + if jsApiPrefix != "" { + opts = append(opts, nats.APIPrefix(jsApiPrefix)) + } + + js, err := nc.JetStream(opts...) + if err != nil { + return nil, nil, err + } + + store, err := js.KeyValue(c.bucket) + if err != nil { + return nil, nil, err } - vals := make(map[string]kv.Entry) - watch, err := store.Watch(context.Background(), "") + return nc, store, err +} + +func (c *kvCommand) statusAction(_ *kingpin.ParseContext) error { + nc, store, err := c.loadBucket() if err != nil { return err } - for val := range watch.Channel() { - if val == nil { - break - } + js, err := nc.JetStream() + if err != nil { + return err + } - switch val.Operation() { - case kv.PutOperation: - vals[val.Key()] = val - case kv.DeleteOperation: - delete(vals, val.Key()) - } + return c.showStatus(store, js) +} - if val.Delta() == 0 { - break - } +func (c *kvCommand) watchAction(_ *kingpin.ParseContext) error { + _, store, err := c.loadBucket() + if err != nil { + return err } - j, err := json.MarshalIndent(vals, "", " ") + watch, err := store.Watch(c.key) if err != nil { return err } + defer watch.Stop() - fmt.Println(string(j)) + for res := range watch.Updates() { + if res != nil { + switch res.Operation() { + case nats.KeyValueDelete: + fmt.Printf("[%s] %s %s > %s\n", res.Created().Format("2006-01-02 15:04:05"), color.RedString(c.strForOp(res.Operation())), res.Bucket(), res.Key()) + case nats.KeyValuePurge: + fmt.Printf("[%s] %s %s > %s\n", res.Created().Format("2006-01-02 15:04:05"), color.RedString(c.strForOp(res.Operation())), res.Bucket(), res.Key()) + case nats.KeyValuePut: + fmt.Printf("[%s] %s %s > %s: %s\n", res.Created().Format("2006-01-02 15:04:05"), color.GreenString(c.strForOp(res.Operation())), res.Bucket(), res.Key(), res.Value()) + } + } + } return nil } @@ -363,44 +431,44 @@ func (c *kvCommand) rmAction(_ *kingpin.ParseContext) error { } } - _, store, err := c.loadBucket() + nc, _, err := prepareHelper("", natsOpts()...) if err != nil { return err } - return store.Destroy() -} - -func (c *kvCommand) showStatus(store kv.KV) error { - status, err := store.Status() + js, err := nc.JetStream() if err != nil { return err } - if c.asJson { - printJSON(status) - return nil + return js.DeleteKeyValue(c.bucket) +} + +func (c *kvCommand) showStatus(store nats.KeyValue, js nats.JetStreamContext) error { + nfo, err := js.StreamInfo(fmt.Sprintf("KV_%s", store.Bucket())) + if err != nil { + return err } - fmt.Printf("%s Key-Value Store Status\n", c.bucket) + fmt.Printf("%s Key-Value Store Status\n", store.Bucket()) fmt.Println() - fmt.Printf(" Bucket Name: %s\n", c.bucket) - fmt.Printf(" History Kept: %d\n", status.History()) - if status.MaxBucketSize() == -1 { + fmt.Printf(" Bucket Name: %s\n", store.Bucket()) + fmt.Printf(" History Kept: %d\n", nfo.Config.MaxMsgsPerSubject) + if nfo.Config.MaxBytes == -1 { fmt.Printf(" Maximum Bucket Size: unlimited\n") } else { - fmt.Printf(" Maximum Bucket Size: %d\n", status.MaxBucketSize()) + fmt.Printf(" Maximum Bucket Size: %d\n", nfo.Config.MaxBytes) } - if status.MaxValueSize() == -1 { + if nfo.Config.MaxBytes == -1 { fmt.Printf(" Maximum Value Size: unlimited\n") } else { - fmt.Printf(" Maximum Value Size: %d\n", status.MaxValueSize()) + fmt.Printf(" Maximum Value Size: %d\n", nfo.Config.MaxBytes) } - if status.BucketLocation() != "" { - fmt.Printf(" Bucket Location: %s\n", status.BucketLocation()) + if nfo.Cluster != nil && nfo.Cluster.Name != "" { + fmt.Printf(" Bucket Location: %s\n", nfo.Cluster.Name) } - fmt.Printf(" Values Stored: %d\n", status.Values()) - fmt.Printf(" Backing Store Name: %s\n", status.BackingStore()) + fmt.Printf(" Values Stored: %d\n", nfo.State.Msgs) + fmt.Printf(" Backing Store Name: %s\n", nfo.Config.Name) return nil } diff --git a/nats/kv_command_test.go b/nats/kv_command_test.go index 0e830abc..c692030e 100644 --- a/nats/kv_command_test.go +++ b/nats/kv_command_test.go @@ -2,7 +2,6 @@ package main import ( "bytes" - "encoding/json" "fmt" "strings" "testing" @@ -16,10 +15,19 @@ func init() { skipContexts = true } -func createTestBucket(t *testing.T, nc *nats.Conn, opts ...kv.Option) kv.KV { +func createTestBucket(t *testing.T, nc *nats.Conn, cfg *nats.KeyValueConfig) nats.KeyValue { t.Helper() - store, err := kv.NewBucket(nc, "T", opts...) + if cfg == nil { + cfg = &nats.KeyValueConfig{Bucket: "T"} + } + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("js failed: %s", err) + } + + store, err := js.CreateKeyValue(cfg) if err != nil { t.Fatalf("new failed: %s", err) } @@ -27,7 +35,7 @@ func createTestBucket(t *testing.T, nc *nats.Conn, opts ...kv.Option) kv.KV { return store } -func mustPut(t *testing.T, store kv.KV, key string, value string) uint64 { +func mustPut(t *testing.T, store nats.KeyValue, key string, value string) uint64 { t.Helper() seq, err := store.Put(key, []byte(value)) @@ -42,7 +50,7 @@ func TestCLIKVGet(t *testing.T) { srv, nc, _ := setupJStreamTest(t) defer srv.Shutdown() - store := createTestBucket(t, nc) + store := createTestBucket(t, nc, nil) mustPut(t, store, "X.Y", "Y") out := runNatsCli(t, fmt.Sprintf("--server='%s' kv get T X.Y --raw", srv.ClientURL())) @@ -55,7 +63,7 @@ func TestCLIKVPut(t *testing.T) { srv, nc, _ := setupJStreamTest(t) defer srv.Shutdown() - store := createTestBucket(t, nc) + store := createTestBucket(t, nc, nil) out := runNatsCli(t, fmt.Sprintf("--server='%s' kv put T X VAL", srv.ClientURL())) if strings.TrimSpace(string(out)) != "VAL" { @@ -75,7 +83,7 @@ func TestCLIKVDel(t *testing.T) { srv, nc, _ := setupJStreamTest(t) defer srv.Shutdown() - store := createTestBucket(t, nc) + store := createTestBucket(t, nc, nil) mustPut(t, store, "X", "VAL") runNatsCli(t, fmt.Sprintf("--server='%s' kv del T X -f", srv.ClientURL())) @@ -87,7 +95,7 @@ func TestCLIKVDel(t *testing.T) { } func TestCLIAdd(t *testing.T) { - srv, nc, mgr := setupJStreamTest(t) + srv, _, mgr := setupJStreamTest(t) defer srv.Shutdown() runNatsCli(t, fmt.Sprintf("--server='%s' kv add T --history 5 --ttl 2m", srv.ClientURL())) @@ -99,48 +107,25 @@ func TestCLIAdd(t *testing.T) { t.Fatalf("stream was not created") } - store, err := kv.NewClient(nc, "T") - if err != nil { - t.Fatalf("client failed: %s", err) - } + stream, _ := mgr.LoadStream("KV_T") - status, err := store.Status() - if err != nil { - t.Fatalf("status failed: %s", err) - } + // TODO: needs status api + // js, err := nc.JetStream() + // if err != nil { + // t.Fatalf("js failed: %s", err) + // } + // + // status, err := store.Status() + // if err != nil { + // t.Fatalf("status failed: %s", err) + // } - if status.History() != 5 { - t.Fatalf("history is %d", status.History()) + if stream.MaxMsgsPerSubject() != 5 { + t.Fatalf("history is %d", stream.MaxMsgsPerSubject()) } - if status.TTL() != 2*time.Minute { - t.Fatalf("ttl is %v", status.TTL()) - } -} - -func TestCLIDump(t *testing.T) { - srv, nc, _ := setupJStreamTest(t) - defer srv.Shutdown() - - store := createTestBucket(t, nc) - mustPut(t, store, "X", "VALX") - mustPut(t, store, "Y", "VALY") - - out := runNatsCli(t, fmt.Sprintf("--server='%s' kv dump T", srv.ClientURL())) - var dumped map[string]kv.GenericEntry - err := json.Unmarshal(out, &dumped) - if err != nil { - t.Fatalf("json unmarshal failed: %s", err) - } - - if len(dumped) != 2 { - t.Fatalf("expected 2 entries got %d", len(dumped)) - } - if dumped["X"].Key != "X" && !bytes.Equal(dumped["X"].Val, []byte("VALX")) { - t.Fatalf("did not get right res: %+v", dumped["X"]) - } - if dumped["Y"].Key != "Y" && !bytes.Equal(dumped["Y"].Val, []byte("VALY")) { - t.Fatalf("did not get right res: %+v", dumped["Y"]) + if stream.MaxAge() != 2*time.Minute { + t.Fatalf("ttl is %v", stream.MaxAge()) } } @@ -148,14 +133,14 @@ func TestCLIPurge(t *testing.T) { srv, nc, _ := setupJStreamTest(t) defer srv.Shutdown() - store := createTestBucket(t, nc) + store := createTestBucket(t, nc, nil) mustPut(t, store, "X", "VALX") mustPut(t, store, "Y", "VALY") runNatsCli(t, fmt.Sprintf("--server='%s' kv purge T X -f", srv.ClientURL())) _, err := store.Get("X") - if err != kv.ErrUnknownKey { + if err != nats.ErrKeyNotFound { t.Fatalf("expected unknown key got: %v", err) } v, err := store.Get("Y") @@ -171,7 +156,7 @@ func TestCLIRM(t *testing.T) { srv, nc, mgr := setupJStreamTest(t) defer srv.Shutdown() - createTestBucket(t, nc) + createTestBucket(t, nc, nil) runNatsCli(t, fmt.Sprintf("--server='%s' kv rm T -f", srv.ClientURL()))