Skip to content

Commit

Permalink
Major rework of ObjectStore.
Browse files Browse the repository at this point in the history
Reorganized interfaces to be cleaner, also made KV same with Manager interface instead of direct nats.Conn functions.
Added Watch, Links and better worked out some logic.

This is ready now for a careful review.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Oct 3, 2021
1 parent 7ff211e commit 5c01667
Show file tree
Hide file tree
Showing 5 changed files with 735 additions and 337 deletions.
154 changes: 81 additions & 73 deletions js.go
Expand Up @@ -30,78 +30,6 @@ import (
"github.com/nats-io/nuid"
)

// Request API subjects for JetStream.
const (
// defaultAPIPrefix is the default prefix for the JetStream API.
defaultAPIPrefix = "$JS.API."

// jsDomainT is used to create JetStream API prefix by specifying only Domain
jsDomainT = "$JS.%s.API."

// apiAccountInfo is for obtaining general information about JetStream.
apiAccountInfo = "INFO"

// apiConsumerCreateT is used to create consumers.
apiConsumerCreateT = "CONSUMER.CREATE.%s"

// apiDurableCreateT is used to create durable consumers.
apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"

// apiConsumerInfoT is used to create consumers.
apiConsumerInfoT = "CONSUMER.INFO.%s.%s"

// apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"

// apiDeleteConsumerT is used to delete consumers.
apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"

// apiConsumerListT is used to return all detailed consumer information
apiConsumerListT = "CONSUMER.LIST.%s"

// apiConsumerNamesT is used to return a list with all consumer names for the stream.
apiConsumerNamesT = "CONSUMER.NAMES.%s"

// apiStreams can lookup a stream by subject.
apiStreams = "STREAM.NAMES"

// apiStreamCreateT is the endpoint to create new streams.
apiStreamCreateT = "STREAM.CREATE.%s"

// apiStreamInfoT is the endpoint to get information on a stream.
apiStreamInfoT = "STREAM.INFO.%s"

// apiStreamUpdate is the endpoint to update existing streams.
apiStreamUpdateT = "STREAM.UPDATE.%s"

// apiStreamDeleteT is the endpoint to delete streams.
apiStreamDeleteT = "STREAM.DELETE.%s"

// apiPurgeStreamT is the endpoint to purge streams.
apiStreamPurgeT = "STREAM.PURGE.%s"

// apiStreamListT is the endpoint that will return all detailed stream information
apiStreamList = "STREAM.LIST"

// apiMsgGetT is the endpoint to get a message.
apiMsgGetT = "STREAM.MSG.GET.%s"

// apiMsgDeleteT is the endpoint to remove a message.
apiMsgDeleteT = "STREAM.MSG.DELETE.%s"

// orderedHeartbeatsInterval is how fast we want HBs from the server during idle.
orderedHeartbeatsInterval = 5 * time.Second

// Scale for threshold of missed HBs or lack of activity.
hbcThresh = 2
)

// Types of control messages, so far heartbeat and flow control
const (
jsCtrlHB = 1
jsCtrlFC = 2
)

// JetStream allows persistent messaging through JetStream.
type JetStream interface {
// Publish publishes a message to JetStream.
Expand Down Expand Up @@ -177,9 +105,82 @@ type JetStream interface {
type JetStreamContext interface {
JetStream
JetStreamManager
ObjectStore
KeyValueManager
ObjectStoreManager
}

// Request API subjects for JetStream.
const (
// defaultAPIPrefix is the default prefix for the JetStream API.
defaultAPIPrefix = "$JS.API."

// jsDomainT is used to create JetStream API prefix by specifying only Domain
jsDomainT = "$JS.%s.API."

// apiAccountInfo is for obtaining general information about JetStream.
apiAccountInfo = "INFO"

// apiConsumerCreateT is used to create consumers.
apiConsumerCreateT = "CONSUMER.CREATE.%s"

// apiDurableCreateT is used to create durable consumers.
apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"

// apiConsumerInfoT is used to create consumers.
apiConsumerInfoT = "CONSUMER.INFO.%s.%s"

// apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"

// apiDeleteConsumerT is used to delete consumers.
apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"

// apiConsumerListT is used to return all detailed consumer information
apiConsumerListT = "CONSUMER.LIST.%s"

// apiConsumerNamesT is used to return a list with all consumer names for the stream.
apiConsumerNamesT = "CONSUMER.NAMES.%s"

// apiStreams can lookup a stream by subject.
apiStreams = "STREAM.NAMES"

// apiStreamCreateT is the endpoint to create new streams.
apiStreamCreateT = "STREAM.CREATE.%s"

// apiStreamInfoT is the endpoint to get information on a stream.
apiStreamInfoT = "STREAM.INFO.%s"

// apiStreamUpdate is the endpoint to update existing streams.
apiStreamUpdateT = "STREAM.UPDATE.%s"

// apiStreamDeleteT is the endpoint to delete streams.
apiStreamDeleteT = "STREAM.DELETE.%s"

// apiPurgeStreamT is the endpoint to purge streams.
apiStreamPurgeT = "STREAM.PURGE.%s"

// apiStreamListT is the endpoint that will return all detailed stream information
apiStreamList = "STREAM.LIST"

// apiMsgGetT is the endpoint to get a message.
apiMsgGetT = "STREAM.MSG.GET.%s"

// apiMsgDeleteT is the endpoint to remove a message.
apiMsgDeleteT = "STREAM.MSG.DELETE.%s"

// orderedHeartbeatsInterval is how fast we want HBs from the server during idle.
orderedHeartbeatsInterval = 5 * time.Second

// Scale for threshold of missed HBs or lack of activity.
hbcThresh = 2
)

// Types of control messages, so far heartbeat and flow control
const (
jsCtrlHB = 1
jsCtrlFC = 2
)

// js is an internal struct from a JetStreamContext.
type js struct {
nc *Conn
Expand Down Expand Up @@ -312,6 +313,13 @@ const (
ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
MsgRollup = "Nats-Rollup"
)

// Rollups, can be subject only or all messages.
const (
MsgRollupSubject = "sub"
MsgRollupAll = "all"
)

// PublishMsg publishes a Msg to a stream from JetStream.
Expand Down
70 changes: 20 additions & 50 deletions kv.go
Expand Up @@ -15,14 +15,22 @@ package nats

import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"
)

type KeyValueManager interface {
// KeyValue will lookup and bind to an existing KeyValue store.
KeyValue(bucket string) (KeyValue, error)
// CreateKeyValue will create a KeyValue store with the following configuration.
CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error)
// DeleteKeyValue will delete this KeyValue store (JetStream stream).
DeleteKeyValue(bucket string) error
}

type KeyValue interface {
// Get returns the latest value for the key.
Get(key string) (entry KeyValueEntry, err error)
Expand Down Expand Up @@ -107,19 +115,15 @@ const (
)

// KeyValue will lookup and bind to an existing KeyValue store.
func (nc *Conn) KeyValue(bucket string, opts ...JSOpt) (KeyValue, error) {
func (js *js) KeyValue(bucket string) (KeyValue, error) {
if bucket == _EMPTY_ {
return nil, ErrBucketNameRequired
}
if strings.Contains(bucket, ".") {
return nil, ErrInvalidBucketName
}
jsc, err := nc.JetStream(opts...)
if err != nil {
return nil, err
}
stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
si, err := jsc.StreamInfo(stream)
si, err := js.StreamInfo(stream)
if err != nil {
if err == ErrStreamNotFound {
err = ErrBucketNotFound
Expand All @@ -136,26 +140,20 @@ func (nc *Conn) KeyValue(bucket string, opts ...JSOpt) (KeyValue, error) {
name: bucket,
stream: stream,
pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket),
nc: nc,
js: jsc.(*js),
js: js,
}
return kv, nil
}

// AddKeyValue will create a KeyValue store with the following configuration.
func (nc *Conn) AddKeyValue(cfg *KeyValueConfig, opts ...JSOpt) (KeyValue, error) {
// CreateKeyValue will create a KeyValue store with the following configuration.
func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
if cfg == nil || cfg.Bucket == _EMPTY_ {
return nil, ErrBucketNameRequired
}
if strings.Contains(cfg.Bucket, ".") {
return nil, ErrInvalidBucketName
}

jsc, err := nc.JetStream(opts...)
if err != nil {
return nil, err
}
if _, err = jsc.AccountInfo(); err != nil {
if _, err := js.AccountInfo(); err != nil {
return nil, err
}

Expand All @@ -181,35 +179,29 @@ func (nc *Conn) AddKeyValue(cfg *KeyValueConfig, opts ...JSOpt) (KeyValue, error
Replicas: replicas,
}

if _, err := jsc.AddStream(scfg); err != nil {
if _, err := js.AddStream(scfg); err != nil {
return nil, err
}

kv := &kvs{
name: cfg.Bucket,
stream: scfg.Name,
pre: fmt.Sprintf(kvSubjectsPreTmpl, cfg.Bucket),
nc: nc,
js: jsc.(*js),
js: js,
}
return kv, nil
}

// DeleteKeyValue will delete this KeyValue store (JetStream stream).
func (nc *Conn) DeleteKeyValue(bucket string, opts ...JSOpt) error {
js, err := nc.JetStream(opts...)
if err != nil {
return err
}
func (js *js) DeleteKeyValue(bucket string) error {
stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
return js.DeleteStream(stream, opts...)
return js.DeleteStream(stream)
}

type kvs struct {
name string
stream string
pre string
nc *Conn
js *js
}

Expand Down Expand Up @@ -316,14 +308,6 @@ type purgeRequest struct {

// Delete the key and all revisions.
func (kv *kvs) Delete(key string) error {
o, cancel, err := getJSContextOpts(kv.js.opts)
if err != nil {
return err
}
if cancel != nil {
defer cancel()
}

var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(key)
Expand All @@ -335,25 +319,11 @@ func (kv *kvs) Delete(key string) error {
if err != nil {
return err
}
preq, err := json.Marshal(&purgeRequest{Subject: b.String(), Keep: 1})
err = kv.js.purgeStream(kv.stream, &streamPurgeRequest{Subject: b.String(), Keep: 1})
if err != nil {
return err
}

// Send request.
subj := kv.js.apiSubj(fmt.Sprintf(apiStreamPurgeT, kv.stream))
r, err := kv.nc.RequestWithContext(o.ctx, subj, preq)
if err != nil {
return err
}
var resp streamPurgeResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
}

// Double check the pubAck future.
select {
case <-paf.Ok():
Expand Down

0 comments on commit 5c01667

Please sign in to comment.