Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NATS JetStream driver #115

Merged
merged 8 commits into from Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions go.mod
Expand Up @@ -9,8 +9,7 @@ require (
github.com/klauspost/compress v1.14.4
github.com/lib/pq v1.10.2
github.com/mattn/go-sqlite3 v1.14.8
github.com/nats-io/jsm.go v0.0.29
github.com/nats-io/nats-server/v2 v2.7.4 // indirect
github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee
github.com/nats-io/nats.go v1.13.1-0.20220314223702-e483e46e5b02
github.com/pkg/errors v0.9.1
github.com/rancher/wrangler v0.8.3
Expand Down
15 changes: 4 additions & 11 deletions go.sum
Expand Up @@ -242,7 +242,6 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangplus/bytes v0.0.0-20160111154220-45c989fe5450/go.mod h1:Bk6SMAONeMXrxql8uvOKuAZSu8aM5RUGv+1C6IJaEho=
github.com/golangplus/fmt v0.0.0-20150411045040-2a5d6d7d2995/go.mod h1:lJgMEyOkYFkPcDKwRXegd+iM6E7matEszMG5HhwytU8=
github.com/golangplus/testing v0.0.0-20180327235837-af21d9c3145e/go.mod h1:0AA//k/eakGydO4jKRoRL2j92ZKSzTgj9tclaCrvXHk=
Expand Down Expand Up @@ -339,8 +338,6 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.14.3/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -375,7 +372,6 @@ github.com/mattn/go-sqlite3 v1.14.8/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
Expand All @@ -401,15 +397,12 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/nats-io/jsm.go v0.0.29 h1:5y4WaH5OkhknpU35/ej8ZGfWQ6FzugklvlUBGj6EJNo=
github.com/nats-io/jsm.go v0.0.29/go.mod h1:ez2gzt0p1YhQXJlzYDZkkoxAQpl6HHpnEI4/GBDzzQA=
github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee h1:+l6i7zS8N1LOokm7dzShezI9STRGrzp0O49Pw8Jetdk=
github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee/go.mod h1:EKSYvbvWAoh0hIfuZ+ieWm8u0VOTRTeDfuQvNPKRqEg=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.3-0.20220217204130-58806c1290b3/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats-server/v2 v2.7.4 h1:c+BZJ3rGzUKCBIM4IXO8uNT2u1vajGbD1kPA6wqCEaM=
github.com/nats-io/nats-server/v2 v2.7.4/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.13.1-0.20220216000616-0096b1bfae8d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats-server/v2 v2.7.5-0.20220309212130-5c0d1999ff72 h1:Moe/K4fo/5FCNpE/TYrMt7sEPUuldBVJ0D4g/SWFkd0=
github.com/nats-io/nats-server/v2 v2.7.5-0.20220309212130-5c0d1999ff72/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc=
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.13.1-0.20220314223702-e483e46e5b02 h1:4J1KOtRZywvAwdINduu5Jp6rOMxPA/WDzkNCtLlfRvk=
github.com/nats-io/nats.go v1.13.1-0.20220314223702-e483e46e5b02/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
Expand Down
99 changes: 51 additions & 48 deletions pkg/drivers/jetstream/jetstream.go
Expand Up @@ -47,11 +47,12 @@ type JSValue struct {
}

// New get the JetStream Backend, establish connection to NATS JetStream. At the moment nats.go does not have
// connection string support so kine will use nats://(token|username:password)hostname:port?bucket=bucketName&context=nats-context`
// connection string support so kine will use nats://(token|username:password)hostname:port?bucket=bucketName&contextFile=nats-context`.
// If contextFile is provided then do not provide a hostname:port in the endpoint URL
//
// bucket: specifies the bucket on the nats server for all of the k3s values for this cluster (optional)
//
// context: specifies the nats context to load from ~/.config/nats/context/ e.g. nats-context for ~/.config/nats/context/nats-context.json
// contextFile: specifies the nats context to load from ~/.config/nats/context/ e.g. nats-context for ~/.config/nats/context/nats-context.json
matthewdevenny marked this conversation as resolved.
Show resolved Hide resolved
//
// Multiple urls can be passed in a comma separated format - only the first in the list will be evaluated for query parameters,
// While auth is valid in the url, the preferred way to pass auth is through a file. If user/pass or token are provided in the
Expand Down Expand Up @@ -131,35 +132,10 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac
// parseNatsConnection returns nats connection url, bucketName and []nats.Option, error
func parseNatsConnection(dsn string, tlsInfo tls.Config) (string, string, []nats.Option, error) {

o := make([]nats.Option, 0)

connections := strings.Split(dsn, ",")
bucketName := kineBucket

connections := strings.Split(dsn, ",")

connBuilder := strings.Builder{}
for idx, c := range connections {
if idx > 0 {
connBuilder.WriteString(",")
}
u, err := url.Parse(c)
if err != nil {
return "", "", nil, err
}
if u.Scheme != "nats" {
return "", "", nil, fmt.Errorf("invalid connection string=%s", c)
}
connBuilder.WriteString("nats://")
if u.User != nil && idx == 0 {
userInfo := strings.Split(u.User.String(), ":")
if len(userInfo) > 1 {
o = append(o, nats.UserInfo(userInfo[0], userInfo[1]))
} else {
o = append(o, nats.Token(userInfo[0]))
}
}
connBuilder.WriteString(u.Host)
}
opts := make([]nats.Option, 0)

u, err := url.Parse(connections[0])
if err != nil {
Expand All @@ -175,43 +151,70 @@ func parseNatsConnection(dsn string, tlsInfo tls.Config) (string, string, []nats
bucketName = b[0]
}

clientCertFile := ""
clientKeyFile := ""

if tlsInfo.CertFile != "" {
clientCertFile = tlsInfo.CertFile
contextFile, hasContext := queryMap["contextFile"]
if hasContext && u.Host != "" {
return "", "", nil, fmt.Errorf("when using context endpoint should be nats://?contextFile=<context-file.json>&bucket=bucketName")
}

if tlsInfo.KeyFile != "" {
clientKeyFile = tlsInfo.KeyFile
}

if clientCertFile != "" && clientKeyFile != "" {
o = append(o, nats.ClientCert(clientCertFile, clientKeyFile))
if tlsInfo.KeyFile != "" && tlsInfo.CertFile != "" {
opts = append(opts, nats.ClientCert(tlsInfo.CertFile, tlsInfo.KeyFile))
}

if tlsInfo.CAFile != "" {
o = append(o, nats.RootCAs(tlsInfo.CAFile))
opts = append(opts, nats.RootCAs(tlsInfo.CAFile))
}

if ctxOpt, ok := queryMap["context"]; ok {
logrus.Infof("loading nats context=%s", ctxOpt[0])
natsContext, err := natscontext.New(ctxOpt[0], true)
if hasContext {
logrus.Infof("loading nats contextFile=%s", contextFile[0])

natsContext, err := natscontext.NewFromFile(contextFile[0])
if err != nil {
return "", "", nil, err
}

connections = strings.Split(natsContext.ServerURL(), ",")

// command line options provided to kine will override the file
// https://github.com/nats-io/jsm.go/blob/v0.0.29/natscontext/context.go#L257
// allows for user, creds, nke, token, certifcate, ca, inboxprefix from the context.json
natsClientOpts, err := natsContext.NATSOptions(o...)
natsClientOpts, err := natsContext.NATSOptions(opts...)
if err != nil {
return "", "", nil, err
}
o = natsClientOpts
opts = natsClientOpts
}
logrus.Infof("using provided options=%v", o)

return connBuilder.String(), bucketName, o, nil
connBuilder := strings.Builder{}
for idx, c := range connections {
if idx > 0 {
connBuilder.WriteString(",")
}

u, err := url.Parse(c)
if err != nil {
return "", "", nil, err
}

if u.Scheme != "nats" {
return "", "", nil, fmt.Errorf("invalid connection string=%s", c)
}

connBuilder.WriteString("nats://")

if u.User != nil && idx == 0 {
userInfo := strings.Split(u.User.String(), ":")
if len(userInfo) > 1 {
opts = append(opts, nats.UserInfo(userInfo[0], userInfo[1]))
} else {
opts = append(opts, nats.Token(userInfo[0]))
}
}
connBuilder.WriteString(u.Host)
}

logrus.Infof("using provided options=%v", opts)

return connBuilder.String(), bucketName, opts, nil
}

func (j *JetStream) Start(ctx context.Context) error {
Expand Down