Skip to content

Commit

Permalink
nats.go kv and obj store
Browse files Browse the repository at this point in the history
This moves the KV backend to nats.go implementation
and adds a new experimental command "nats obj"

Signed-off-by: R.I.Pienaar <rip@devco.net>
  • Loading branch information
ripienaar committed Oct 13, 2021
1 parent 7ce9c60 commit af246b9
Show file tree
Hide file tree
Showing 8 changed files with 585 additions and 89 deletions.
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -20,7 +20,7 @@ require (
github.com/klauspost/compress v1.13.4
github.com/nats-io/jsm.go v0.0.27-0.20211006163108-9aae04fb57e9
github.com/nats-io/nats-server/v2 v2.6.2-0.20211006145508-3f12216fcc34
github.com/nats-io/nats.go v1.13.0
github.com/nats-io/nats.go v1.13.1-0.20211013144704-0f47b1ceedfa
github.com/nats-io/nuid v1.0.1
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.26.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -119,8 +119,8 @@ github.com/nats-io/jwt/v2 v2.1.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgv
github.com/nats-io/nats-server/v2 v2.6.2-0.20211006145508-3f12216fcc34 h1:Qq2jwQrv/hyU6MmYZR5NYJ6wKVwRULJpn0CoDMtMUZg=
github.com/nats-io/nats-server/v2 v2.6.2-0.20211006145508-3f12216fcc34/go.mod h1:ubcDOPViqaQcNvJVzoX9FIDxAxyJDTItw07lqFCzC80=
github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE=
github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.13.1-0.20211013144704-0f47b1ceedfa h1:wz8dAjZOLCe+lQNooGB+giVUc0VNZmvhhE8ptITjqKY=
github.com/nats-io/nats.go v1.13.1-0.20211013144704-0f47b1ceedfa/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
211 changes: 128 additions & 83 deletions nats/kv_command.go
@@ -1,3 +1,16 @@
// Copyright 2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
Expand Down Expand Up @@ -25,6 +38,7 @@ type kvCommand struct {
maxBucketSize int64
cluster string
revision uint64
description string
}

func configureKVCommand(app *kingpin.Application) {
Expand All @@ -42,16 +56,26 @@ NOTE: This is an experimental feature.

kv := app.Command("kv", help)

get := kv.Command("get", "Gets a value for a key").Action(c.getAction)
get.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket)
get.Arg("key", "The key to act on").Required().StringVar(&c.key)
get.Flag("raw", "Show only the value string").BoolVar(&c.raw)
add := kv.Command("add", "Adds a new KV Store Bucket").Alias("new").Action(c.addAction)
add.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket)
add.Flag("history", "How many historic values to keep per key").Default("1").Uint64Var(&c.history)
add.Flag("ttl", "How long to keep values for").DurationVar(&c.ttl)
add.Flag("replicas", "How many replicas of the data to store").Default("1").UintVar(&c.replicas)
add.Flag("cluster", "Place the bucket in a specific cluster").StringVar(&c.cluster)
add.Flag("max-value-size", "Maximum size for any single value").Int32Var(&c.maxValueSize)
add.Flag("max-bucket-size", "Maximum size for the bucket").Int64Var(&c.maxBucketSize)
add.Flag("description", "A description for the bucket").StringVar(&c.description)

put := kv.Command("put", "Puts a value into a key").Action(c.putAction)
put.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket)
put.Arg("key", "The key to act on").Required().StringVar(&c.key)
put.Arg("value", "The value to store, when empty reads STDIN").StringVar(&c.val)

get := kv.Command("get", "Gets a value for a key").Action(c.getAction)
get.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket)
get.Arg("key", "The key to act on").Required().StringVar(&c.key)
get.Flag("raw", "Show only the value string").BoolVar(&c.raw)

create := kv.Command("create", "Puts a value into a key only if the key is new or it's last operation was a delete").Action(c.createAction)
create.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket)
create.Arg("key", "The key to act on").Required().StringVar(&c.key)
Expand All @@ -77,15 +101,6 @@ NOTE: This is an experimental feature.
history.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket)
history.Arg("key", "The key to act on").Required().StringVar(&c.key)

add := kv.Command("add", "Adds a new KV store").Alias("new").Action(c.addAction)
add.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket)
add.Flag("history", "How many historic values to keep per key").Default("1").Uint64Var(&c.history)
add.Flag("ttl", "How long to keep values for").DurationVar(&c.ttl)
add.Flag("replicas", "How many replicas of the data to store").Default("1").UintVar(&c.replicas)
add.Flag("cluster", "Place the bucket in a specific cluster").StringVar(&c.cluster)
add.Flag("max-value-size", "Maximum size for any single value").Int32Var(&c.maxValueSize)
add.Flag("max-bucket-size", "Maximum size for the bucket").Int64Var(&c.maxBucketSize)

status := kv.Command("status", "View the status of a KV store").Alias("view").Alias("info").Action(c.statusAction)
status.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket)

Expand All @@ -101,6 +116,9 @@ NOTE: This is an experimental feature.
rmHistory.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket)
rmHistory.Flag("force", "Act without confirmation").Short('f').BoolVar(&c.force)

upgrade := kv.Command("upgrade", "Upgrades a early tech-preview bucket to current format").Action(c.upgradeAction)
upgrade.Arg("bucket", "The bucket to act on").Required().StringVar(&c.bucket)

cheats["kv"] = `# to create a replicated KV bucket
nats kv add CONFIG --replicas 3
Expand Down Expand Up @@ -128,8 +146,40 @@ func (c *kvCommand) strForOp(op nats.KeyValueOp) string {
}
}

func (c *kvCommand) upgradeAction(_ *kingpin.ParseContext) error {
_, js, store, err := c.loadBucket()
if err != nil {
return err
}

status, err := store.Status()
if err != nil {
return err
}

nfo := status.(*nats.KeyValueBucketStatus).StreamInfo()
if nfo.Config.AllowRollup {
fmt.Println("Bucket is already using the correct configuration")
}

nfo.Config.AllowRollup = true
nfo, err = js.UpdateStream(&nfo.Config)
if err != nil {
return err
}

if !nfo.Config.AllowRollup {
fmt.Printf("Configuration upgrade failed, please edit stream %s to allow RollUps", nfo.Config.Name)
os.Exit(1)
}

fmt.Printf("Bucket %s has been upgraded\n\n", status.Bucket())

return c.showStatus(store)
}

func (c *kvCommand) historyAction(_ *kingpin.ParseContext) error {
_, store, err := c.loadBucket()
_, _, store, err := c.loadBucket()
if err != nil {
return err
}
Expand All @@ -156,7 +206,7 @@ func (c *kvCommand) historyAction(_ *kingpin.ParseContext) error {
}

func (c *kvCommand) compactAction(_ *kingpin.ParseContext) error {
_, store, err := c.loadBucket()
_, _, store, err := c.loadBucket()
if err != nil {
return err
}
Expand All @@ -177,7 +227,7 @@ func (c *kvCommand) compactAction(_ *kingpin.ParseContext) error {
}

func (c *kvCommand) deleteAction(_ *kingpin.ParseContext) error {
_, store, err := c.loadBucket()
_, _, store, err := c.loadBucket()
if err != nil {
return err
}
Expand Down Expand Up @@ -210,6 +260,7 @@ func (c *kvCommand) addAction(_ *kingpin.ParseContext) error {

store, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: c.bucket,
Description: c.description,
MaxValueSize: c.maxValueSize,
History: uint8(c.history),
TTL: c.ttl,
Expand All @@ -221,11 +272,11 @@ func (c *kvCommand) addAction(_ *kingpin.ParseContext) error {
return err
}

return c.showStatus(store, js)
return c.showStatus(store)
}

func (c *kvCommand) getAction(_ *kingpin.ParseContext) error {
_, store, err := c.loadBucket()
_, _, store, err := c.loadBucket()
if err != nil {
return err
}
Expand Down Expand Up @@ -257,7 +308,7 @@ func (c *kvCommand) getAction(_ *kingpin.ParseContext) error {
}

func (c *kvCommand) putAction(_ *kingpin.ParseContext) error {
_, store, err := c.loadBucket()
_, _, store, err := c.loadBucket()
if err != nil {
return err
}
Expand All @@ -278,7 +329,7 @@ func (c *kvCommand) putAction(_ *kingpin.ParseContext) error {
}

func (c *kvCommand) createAction(_ *kingpin.ParseContext) error {
_, store, err := c.loadBucket()
_, _, store, err := c.loadBucket()
if err != nil {
return err
}
Expand All @@ -299,7 +350,7 @@ func (c *kvCommand) createAction(_ *kingpin.ParseContext) error {
}

func (c *kvCommand) updateAction(_ *kingpin.ParseContext) error {
_, store, err := c.loadBucket()
_, _, store, err := c.loadBucket()
if err != nil {
return err
}
Expand Down Expand Up @@ -327,50 +378,31 @@ func (c *kvCommand) valOrReadVal() ([]byte, error) {
return ioutil.ReadAll(os.Stdin)
}

func (c *kvCommand) loadBucket() (*nats.Conn, nats.KeyValue, error) {
nc, _, err := prepareHelper("", natsOpts()...)
func (c *kvCommand) loadBucket() (*nats.Conn, nats.JetStreamContext, nats.KeyValue, error) {
nc, js, err := prepareJSHelper("", natsOpts()...)
if err != nil {
return nil, nil, err
}

var opts []nats.JSOpt

if jsDomain != "" {
opts = append(opts, nats.Domain(jsDomain))
}
if jsApiPrefix != "" {
opts = append(opts, nats.APIPrefix(jsApiPrefix))
}

js, err := nc.JetStream(opts...)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

store, err := js.KeyValue(c.bucket)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

return nc, store, err
return nc, js, store, err
}

func (c *kvCommand) statusAction(_ *kingpin.ParseContext) error {
nc, store, err := c.loadBucket()
if err != nil {
return err
}

js, err := nc.JetStream()
_, _, store, err := c.loadBucket()
if err != nil {
return err
}

return c.showStatus(store, js)
return c.showStatus(store)
}

func (c *kvCommand) watchAction(_ *kingpin.ParseContext) error {
_, store, err := c.loadBucket()
_, _, store, err := c.loadBucket()
if err != nil {
return err
}
Expand All @@ -382,23 +414,25 @@ func (c *kvCommand) watchAction(_ *kingpin.ParseContext) error {
defer watch.Stop()

for res := range watch.Updates() {
if res != nil {
switch res.Operation() {
case nats.KeyValueDelete:
fmt.Printf("[%s] %s %s > %s\n", res.Created().Format("2006-01-02 15:04:05"), color.RedString(c.strForOp(res.Operation())), res.Bucket(), res.Key())
case nats.KeyValuePurge:
fmt.Printf("[%s] %s %s > %s\n", res.Created().Format("2006-01-02 15:04:05"), color.RedString(c.strForOp(res.Operation())), res.Bucket(), res.Key())
case nats.KeyValuePut:
fmt.Printf("[%s] %s %s > %s: %s\n", res.Created().Format("2006-01-02 15:04:05"), color.GreenString(c.strForOp(res.Operation())), res.Bucket(), res.Key(), res.Value())
}
if res == nil {
continue
}

switch res.Operation() {
case nats.KeyValueDelete:
fmt.Printf("[%s] %s %s > %s\n", res.Created().Format("2006-01-02 15:04:05"), color.RedString(c.strForOp(res.Operation())), res.Bucket(), res.Key())
case nats.KeyValuePurge:
fmt.Printf("[%s] %s %s > %s\n", res.Created().Format("2006-01-02 15:04:05"), color.RedString(c.strForOp(res.Operation())), res.Bucket(), res.Key())
case nats.KeyValuePut:
fmt.Printf("[%s] %s %s > %s: %s\n", res.Created().Format("2006-01-02 15:04:05"), color.GreenString(c.strForOp(res.Operation())), res.Bucket(), res.Key(), res.Value())
}
}

return nil
}

func (c *kvCommand) purgeAction(_ *kingpin.ParseContext) error {
_, store, err := c.loadBucket()
_, _, store, err := c.loadBucket()
if err != nil {
return err
}
Expand Down Expand Up @@ -431,44 +465,55 @@ func (c *kvCommand) rmAction(_ *kingpin.ParseContext) error {
}
}

nc, _, err := prepareHelper("", natsOpts()...)
if err != nil {
return err
}

js, err := nc.JetStream()
_, js, err := prepareJSHelper("", natsOpts()...)
if err != nil {
return err
}

return js.DeleteKeyValue(c.bucket)
}

func (c *kvCommand) showStatus(store nats.KeyValue, js nats.JetStreamContext) error {
nfo, err := js.StreamInfo(fmt.Sprintf("KV_%s", store.Bucket()))
func (c *kvCommand) showStatus(store nats.KeyValue) error {
status, err := store.Status()
if err != nil {
return err
}

fmt.Printf("%s Key-Value Store Status\n", store.Bucket())
fmt.Printf("%s Key-Value Store Status\n", status.Bucket())
fmt.Println()
fmt.Printf(" Bucket Name: %s\n", store.Bucket())
fmt.Printf(" History Kept: %d\n", nfo.Config.MaxMsgsPerSubject)
if nfo.Config.MaxBytes == -1 {
fmt.Printf(" Maximum Bucket Size: unlimited\n")
} else {
fmt.Printf(" Maximum Bucket Size: %d\n", nfo.Config.MaxBytes)
}
if nfo.Config.MaxBytes == -1 {
fmt.Printf(" Maximum Value Size: unlimited\n")
} else {
fmt.Printf(" Maximum Value Size: %d\n", nfo.Config.MaxBytes)
}
if nfo.Cluster != nil && nfo.Cluster.Name != "" {
fmt.Printf(" Bucket Location: %s\n", nfo.Cluster.Name)
fmt.Printf(" Bucket Name: %s\n", status.Bucket())
fmt.Printf(" History Kept: %d\n", status.History())
fmt.Printf(" Values Stored: %d\n", status.Values())
fmt.Printf(" Backing Store Kind: %s\n", status.BackingStore())

if status.BackingStore() == "JetStream" {
nfo := status.(*nats.KeyValueBucketStatus).StreamInfo()
if nfo.Config.Description != "" {
fmt.Printf(" Description: %s\n", nfo.Config.Description)
}
if nfo.Config.MaxBytes == -1 {
fmt.Printf(" Maximum Bucket Size: unlimited\n")
} else {
fmt.Printf(" Maximum Bucket Size: %d\n", nfo.Config.MaxBytes)
}
if nfo.Config.MaxBytes == -1 {
fmt.Printf(" Maximum Value Size: unlimited\n")
} else {
fmt.Printf(" Maximum Value Size: %d\n", nfo.Config.MaxMsgSize)
}
fmt.Printf(" JetStream Stream: %s\n", nfo.Config.Name)
if nfo.Cluster != nil {
fmt.Printf(" Cluster Location: %s\n", nfo.Cluster.Name)
}

if !nfo.Config.AllowRollup {
fmt.Println()
fmt.Println("Warning the bucket does not support roll-ups")
fmt.Println("and needs a configuration upgrade.")
fmt.Println()
fmt.Printf("Please run: nats kv upgrade %s\n\n", status.Bucket())
}
}
fmt.Printf(" Values Stored: %d\n", nfo.State.Msgs)
fmt.Printf(" Backing Store Name: %s\n", nfo.Config.Name)

return nil
}
1 change: 1 addition & 0 deletions nats/main.go
Expand Up @@ -102,6 +102,7 @@ See 'nats cheat' for a quick cheatsheet of commands
configureEventsCommand(ncli)
configureGovernorCommand(ncli)
configureKVCommand(ncli)
configureObjectCommand(ncli)
configureLatencyCommand(ncli)
configurePubCommand(ncli)
configureRTTCommand(ncli)
Expand Down

0 comments on commit af246b9

Please sign in to comment.