Skip to content

Commit

Permalink
[ADDED] KeyValue and ObjectStore support for JetStream.
Browse files Browse the repository at this point in the history
Also:
- Fixed message reply in PublishMsgAsync
- Ability to seal streams
- Ability for consumer to get message headers only, no msg payload
- GetLastMsg and purgeStream by subject

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored and kozlovic committed Oct 7, 2021
1 parent 4e70dbe commit 141643a
Show file tree
Hide file tree
Showing 12 changed files with 2,861 additions and 105 deletions.
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6
github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
19 changes: 5 additions & 14 deletions go_test.sum
Expand Up @@ -15,33 +15,24 @@ github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEE
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6 h1:TYI6K487xhbbpKjz4gIIVBWL6l2gFI3JHu/N0XySwRY=
github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6/go.mod h1:xZLDZ6cRUu9FCh7+mKXGEy16O66CdWVxttxNIiUuNCk=
github.com/nats-io/nats.go v1.12.2-0.20210916222008-92921544b891/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/jwt/v2 v2.1.0 h1:1UbfD5g1xTdWmSeRV8bh/7u+utTiBsRtWhLl1PixZp4=
github.com/nats-io/jwt/v2 v2.1.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da h1:0snsE4pD2VKIsFiRMRkHFY+SJZVbT7/eZJ1lOt5XuLA=
github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da/go.mod h1:ubcDOPViqaQcNvJVzoX9FIDxAxyJDTItw07lqFCzC80=
github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
181 changes: 102 additions & 79 deletions js.go
Expand Up @@ -30,6 +30,85 @@ import (
"github.com/nats-io/nuid"
)

// JetStream allows persistent messaging through JetStream.
type JetStream interface {
// Publish publishes a message to JetStream.
Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)

// PublishMsg publishes a Msg to JetStream.
PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error)

// PublishAsync publishes a message to JetStream and returns a PubAckFuture.
// The data should not be changed until the PubAckFuture has been processed.
PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)

// PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture.
// The message should not be changed until the PubAckFuture has been processed.
PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)

// PublishAsyncPending returns the number of async publishes outstanding for this context.
PublishAsyncPending() int

// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd.
PublishAsyncComplete() <-chan struct{}

// Subscribe creates an async Subscription for JetStream.
// The stream and consumer names can be provided with the nats.Bind() option.
// For creating an ephemeral (where the consumer name is picked by the server),
// you can provide the stream name with nats.BindStream().
// If no stream name is specified, the library will attempt to figure out which
// stream the subscription is for. See important notes below for more details.
//
// IMPORTANT NOTES:
// * If none of the options Bind() nor Durable() are specified, the library will
// send a request to the server to create an ephemeral JetStream consumer,
// which will be deleted after an Unsubscribe() or Drain(), or automatically
// by the server after a short period of time after the NATS subscription is
// gone.
// * If Durable() option is specified, the library will attempt to lookup a JetStream
// consumer with this name, and if found, will bind to it and not attempt to
// delete it. However, if not found, the library will send a request to create
// such durable JetStream consumer. The library will delete the JetStream consumer
// after an Unsubscribe() or Drain().
// * If Bind() option is provided, the library will attempt to lookup the
// consumer with the given name, and if successful, bind to it. If the lookup fails,
// then the Subscribe() call will return an error.
Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)

// SubscribeSync creates a Subscription that can be used to process messages synchronously.
// See important note in Subscribe()
SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)

// ChanSubscribe creates channel based Subscription.
// See important note in Subscribe()
ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

// ChanQueueSubscribe creates channel based Subscription with a queue group.
// See important note in QueueSubscribe()
ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

// QueueSubscribe creates a Subscription with a queue group.
// If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
// See important note in Subscribe()
QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)

// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
// See important note in QueueSubscribe()
QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)

// PullSubscribe creates a Subscription that can fetch messages.
// See important note in Subscribe()
PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
}

// JetStreamContext allows JetStream messaging and stream management.
type JetStreamContext interface {
JetStream
JetStreamManager
KeyValueManager
ObjectStoreManager
}

// Request API subjects for JetStream.
const (
// defaultAPIPrefix is the default prefix for the JetStream API.
Expand Down Expand Up @@ -110,84 +189,6 @@ const (
jsCtrlFC = 2
)

// JetStream allows persistent messaging through JetStream.
type JetStream interface {
// Publish publishes a message to JetStream.
Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)

// PublishMsg publishes a Msg to JetStream.
PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error)

// PublishAsync publishes a message to JetStream and returns a PubAckFuture.
// The data should not be changed until the PubAckFuture has been processed.
PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)

// PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture.
// The message should not be changed until the PubAckFuture has been processed.
PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)

// PublishAsyncPending returns the number of async publishes outstanding for this context.
PublishAsyncPending() int

// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd.
PublishAsyncComplete() <-chan struct{}

// Subscribe creates an async Subscription for JetStream.
// The stream and consumer names can be provided with the nats.Bind() option.
// For creating an ephemeral (where the consumer name is picked by the server),
// you can provide the stream name with nats.BindStream().
// If no stream name is specified, the library will attempt to figure out which
// stream the subscription is for. See important notes below for more details.
//
// IMPORTANT NOTES:
// * If none of the options Bind() nor Durable() are specified, the library will
// send a request to the server to create an ephemeral JetStream consumer,
// which will be deleted after an Unsubscribe() or Drain(), or automatically
// by the server after a short period of time after the NATS subscription is
// gone.
// * If Durable() option is specified, the library will attempt to lookup a JetStream
// consumer with this name, and if found, will bind to it and not attempt to
// delete it. However, if not found, the library will send a request to create
// such durable JetStream consumer. The library will delete the JetStream consumer
// after an Unsubscribe() or Drain().
// * If Bind() option is provided, the library will attempt to lookup the
// consumer with the given name, and if successful, bind to it. If the lookup fails,
// then the Subscribe() call will return an error.
Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)

// SubscribeSync creates a Subscription that can be used to process messages synchronously.
// See important note in Subscribe()
SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)

// ChanSubscribe creates channel based Subscription.
// See important note in Subscribe()
ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

// ChanQueueSubscribe creates channel based Subscription with a queue group.
// See important note in QueueSubscribe()
ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

// QueueSubscribe creates a Subscription with a queue group.
// If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
// In that case, the queue name cannot contain dots ".", same restriction that is applied to a durable name.
// See important note in Subscribe()
QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)

// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
// See important note in QueueSubscribe()
QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)

// PullSubscribe creates a Subscription that can fetch messages.
// See important note in Subscribe()
PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
}

// JetStreamContext allows JetStream messaging and stream management.
type JetStreamContext interface {
JetStream
JetStreamManager
}

// js is an internal struct from a JetStreamContext.
type js struct {
nc *Conn
Expand Down Expand Up @@ -320,6 +321,16 @@ const (
ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
MsgRollup = "Nats-Rollup"
)

// MsgSize is a header that will be part of a consumer's delivered message if HeadersOnly requested.
const MsgSize = "Nats-Msg-Size"

// Rollups, can be subject only or all messages.
const (
MsgRollupSubject = "sub"
MsgRollupAll = "all"
)

// PublishMsg publishes a Msg to a stream from JetStream.
Expand Down Expand Up @@ -667,10 +678,14 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if m.Reply != _EMPTY_ {
return nil, errors.New("nats: reply subject should be empty")
}
reply := m.Reply
m.Reply = js.newAsyncReply()
defer func() { m.Reply = reply }()

if m.Reply == _EMPTY_ {
return nil, errors.New("nats: error creating async reply handler")
}

id := m.Reply[aReplyPreLen:]
paf := &pubAckFuture{msg: m, st: time.Now()}
numPending, maxPending := js.registerPAF(id, paf)
Expand All @@ -683,7 +698,6 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
return nil, errors.New("nats: stalled with too many outstanding async published messages")
}
}

if err := js.nc.PublishMsg(m); err != nil {
js.clearPAF(id)
return nil, err
Expand Down Expand Up @@ -841,6 +855,7 @@ type ConsumerConfig struct {
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`
}

// ConsumerInfo is the info from a JetStream consumer.
Expand Down Expand Up @@ -2120,6 +2135,14 @@ func DeliverSubject(subject string) SubOpt {
})
}

// HeadersOnly() will instruct the consumer to only deleiver headers and no payloads.
func HeadersOnly() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.HeadersOnly = true
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down

0 comments on commit 141643a

Please sign in to comment.