Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] KeyValue and ObjectStore support for JetStream #832

Merged
merged 1 commit into from Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6
github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
19 changes: 5 additions & 14 deletions go_test.sum
Expand Up @@ -15,33 +15,24 @@ github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEE
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6 h1:TYI6K487xhbbpKjz4gIIVBWL6l2gFI3JHu/N0XySwRY=
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6/go.mod h1:xZLDZ6cRUu9FCh7+mKXGEy16O66CdWVxttxNIiUuNCk=
github.com/nats-io/nats.go v1.12.2-0.20210916222008-92921544b891/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/jwt/v2 v2.1.0 h1:1UbfD5g1xTdWmSeRV8bh/7u+utTiBsRtWhLl1PixZp4=
github.com/nats-io/jwt/v2 v2.1.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da h1:0snsE4pD2VKIsFiRMRkHFY+SJZVbT7/eZJ1lOt5XuLA=
github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da/go.mod h1:ubcDOPViqaQcNvJVzoX9FIDxAxyJDTItw07lqFCzC80=
github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
181 changes: 102 additions & 79 deletions js.go
Expand Up @@ -30,6 +30,85 @@ import (
"github.com/nats-io/nuid"
)

// JetStream allows persistent messaging through JetStream.
type JetStream interface {
// Publish publishes a message to JetStream.
Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)

// PublishMsg publishes a Msg to JetStream.
PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error)

// PublishAsync publishes a message to JetStream and returns a PubAckFuture.
// The data should not be changed until the PubAckFuture has been processed.
PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)

// PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture.
// The message should not be changed until the PubAckFuture has been processed.
PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)

// PublishAsyncPending returns the number of async publishes outstanding for this context.
PublishAsyncPending() int

// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd.
PublishAsyncComplete() <-chan struct{}

// Subscribe creates an async Subscription for JetStream.
// The stream and consumer names can be provided with the nats.Bind() option.
// For creating an ephemeral (where the consumer name is picked by the server),
// you can provide the stream name with nats.BindStream().
// If no stream name is specified, the library will attempt to figure out which
// stream the subscription is for. See important notes below for more details.
//
// IMPORTANT NOTES:
// * If none of the options Bind() nor Durable() are specified, the library will
// send a request to the server to create an ephemeral JetStream consumer,
// which will be deleted after an Unsubscribe() or Drain(), or automatically
// by the server after a short period of time after the NATS subscription is
// gone.
// * If Durable() option is specified, the library will attempt to lookup a JetStream
// consumer with this name, and if found, will bind to it and not attempt to
// delete it. However, if not found, the library will send a request to create
// such durable JetStream consumer. The library will delete the JetStream consumer
// after an Unsubscribe() or Drain().
// * If Bind() option is provided, the library will attempt to lookup the
// consumer with the given name, and if successful, bind to it. If the lookup fails,
// then the Subscribe() call will return an error.
Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)

// SubscribeSync creates a Subscription that can be used to process messages synchronously.
// See important note in Subscribe()
SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)

// ChanSubscribe creates channel based Subscription.
// See important note in Subscribe()
ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

// ChanQueueSubscribe creates channel based Subscription with a queue group.
// See important note in QueueSubscribe()
ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

// QueueSubscribe creates a Subscription with a queue group.
// If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
// See important note in Subscribe()
QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)

// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
// See important note in QueueSubscribe()
QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)

// PullSubscribe creates a Subscription that can fetch messages.
// See important note in Subscribe()
PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
}

// JetStreamContext allows JetStream messaging and stream management.
type JetStreamContext interface {
JetStream
JetStreamManager
KeyValueManager
ObjectStoreManager
}

// Request API subjects for JetStream.
const (
// defaultAPIPrefix is the default prefix for the JetStream API.
Expand Down Expand Up @@ -110,84 +189,6 @@ const (
jsCtrlFC = 2
)

// JetStream allows persistent messaging through JetStream.
type JetStream interface {
// Publish publishes a message to JetStream.
Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)

// PublishMsg publishes a Msg to JetStream.
PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error)

// PublishAsync publishes a message to JetStream and returns a PubAckFuture.
// The data should not be changed until the PubAckFuture has been processed.
PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)

// PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture.
// The message should not be changed until the PubAckFuture has been processed.
PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)

// PublishAsyncPending returns the number of async publishes outstanding for this context.
PublishAsyncPending() int

// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd.
PublishAsyncComplete() <-chan struct{}

// Subscribe creates an async Subscription for JetStream.
// The stream and consumer names can be provided with the nats.Bind() option.
// For creating an ephemeral (where the consumer name is picked by the server),
// you can provide the stream name with nats.BindStream().
// If no stream name is specified, the library will attempt to figure out which
// stream the subscription is for. See important notes below for more details.
//
// IMPORTANT NOTES:
// * If none of the options Bind() nor Durable() are specified, the library will
// send a request to the server to create an ephemeral JetStream consumer,
// which will be deleted after an Unsubscribe() or Drain(), or automatically
// by the server after a short period of time after the NATS subscription is
// gone.
// * If Durable() option is specified, the library will attempt to lookup a JetStream
// consumer with this name, and if found, will bind to it and not attempt to
// delete it. However, if not found, the library will send a request to create
// such durable JetStream consumer. The library will delete the JetStream consumer
// after an Unsubscribe() or Drain().
// * If Bind() option is provided, the library will attempt to lookup the
// consumer with the given name, and if successful, bind to it. If the lookup fails,
// then the Subscribe() call will return an error.
Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)

// SubscribeSync creates a Subscription that can be used to process messages synchronously.
// See important note in Subscribe()
SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)

// ChanSubscribe creates channel based Subscription.
// See important note in Subscribe()
ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

// ChanQueueSubscribe creates channel based Subscription with a queue group.
// See important note in QueueSubscribe()
ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

// QueueSubscribe creates a Subscription with a queue group.
// If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
// In that case, the queue name cannot contain dots ".", same restriction that is applied to a durable name.
// See important note in Subscribe()
QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)

// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
// See important note in QueueSubscribe()
QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)

// PullSubscribe creates a Subscription that can fetch messages.
// See important note in Subscribe()
PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
}

// JetStreamContext allows JetStream messaging and stream management.
type JetStreamContext interface {
JetStream
JetStreamManager
}

// js is an internal struct from a JetStreamContext.
type js struct {
nc *Conn
Expand Down Expand Up @@ -320,6 +321,16 @@ const (
ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
MsgRollup = "Nats-Rollup"
)

// MsgSize is a header that will be part of a consumer's delivered message if HeadersOnly requested.
const MsgSize = "Nats-Msg-Size"

// Rollups, can be subject only or all messages.
const (
MsgRollupSubject = "sub"
MsgRollupAll = "all"
)

// PublishMsg publishes a Msg to a stream from JetStream.
Expand Down Expand Up @@ -667,10 +678,14 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if m.Reply != _EMPTY_ {
return nil, errors.New("nats: reply subject should be empty")
}
reply := m.Reply
m.Reply = js.newAsyncReply()
defer func() { m.Reply = reply }()

if m.Reply == _EMPTY_ {
return nil, errors.New("nats: error creating async reply handler")
}

id := m.Reply[aReplyPreLen:]
paf := &pubAckFuture{msg: m, st: time.Now()}
numPending, maxPending := js.registerPAF(id, paf)
Expand All @@ -683,7 +698,6 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
return nil, errors.New("nats: stalled with too many outstanding async published messages")
}
}

if err := js.nc.PublishMsg(m); err != nil {
js.clearPAF(id)
return nil, err
Expand Down Expand Up @@ -841,6 +855,7 @@ type ConsumerConfig struct {
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`
}

// ConsumerInfo is the info from a JetStream consumer.
Expand Down Expand Up @@ -2120,6 +2135,14 @@ func DeliverSubject(subject string) SubOpt {
})
}

// HeadersOnly() will instruct the consumer to only deleiver headers and no payloads.
func HeadersOnly() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.HeadersOnly = true
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down