Skip to content

Commit

Permalink
Merge pull request #980 from nats-io/js_stream_republish_and_cons_con…
Browse files Browse the repository at this point in the history
…fig_updates

[ADDED] Stream RePublish and some ConsumerConfig new fields
  • Loading branch information
kozlovic committed May 25, 2022
2 parents 144a3b2 + 9aaf72b commit 4e4f318
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 6 deletions.
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.8.2
github.com/nats-io/nats-server/v2 v2.8.4-0.20220524225320-752c0adec50d
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
6 changes: 3 additions & 3 deletions go_test.sum
Expand Up @@ -15,9 +15,9 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.8.2 h1:5m1VytMEbZx0YINvKY+X2gXdLNwP43uLXnFRwz8j8KE=
github.com/nats-io/nats-server/v2 v2.8.2/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4=
github.com/nats-io/nats.go v1.14.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats-server/v2 v2.8.4-0.20220524225320-752c0adec50d h1:tSc2SvfJE24lwT5750qLG3sW0ZkVhK1BjFlv7IfO1SY=
github.com/nats-io/nats-server/v2 v2.8.4-0.20220524225320-752c0adec50d/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
18 changes: 16 additions & 2 deletions js.go
Expand Up @@ -373,6 +373,13 @@ const (
MsgRollup = "Nats-Rollup"
)

// Headers for republished messages.
const (
JSStream = "Nats-Stream"
JSSequence = "Nats-Sequence"
JSLastSequence = "Nats-Last-Sequence"
)

// MsgSize is a header that will be part of a consumer's delivered message if HeadersOnly requested.
const MsgSize = "Nats-Msg-Size"

Expand Down Expand Up @@ -961,8 +968,6 @@ func (d nakDelay) configureAck(opts *ackOpts) error {
type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
Description string `json:"description,omitempty"`
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
Expand All @@ -984,8 +989,17 @@ type ConsumerConfig struct {
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`

// Push based consumers.
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`

// Ephemeral inactivity threshold.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`

// Generally inherited by parent stream and other markers, now can be configured directly.
Replicas int `json:"num_replicas"`
// Force memory storage.
MemoryStorage bool `json:"mem_storage,omitempty"`
}

// ConsumerInfo is the info from a JetStream consumer.
Expand Down
9 changes: 9 additions & 0 deletions jsm.go
Expand Up @@ -100,6 +100,15 @@ type StreamConfig struct {
DenyDelete bool `json:"deny_delete,omitempty"`
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`

// Allow republish of the message after being sequenced and stored.
RePublish *SubjectMapping `json:"republish,omitempty"`
}

// SubjectMapping allows a source subject to be mapped to a destination subject for republishing.
type SubjectMapping struct {
Source string `json:"src,omitempty"`
Destination string `json:"dest"`
}

// Placement is used to guide placement of streams in clustered JetStream.
Expand Down
111 changes: 111 additions & 0 deletions test/js_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"net"
"os"
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -6617,3 +6618,113 @@ func TestJetStreamClusterStreamLeaderChangeClientErr(t *testing.T) {
}
})
}

func TestJetStreamConsumerConfigReplicasAndMemStorage(t *testing.T) {
withJSCluster(t, "CR", 3, func(t *testing.T, nodes ...*jsServer) {
nc, js := jsClient(t, nodes[0].Server)
defer nc.Close()

if _, err := js.AddStream(&nats.StreamConfig{
Name: "CR",
Subjects: []string{"foo"},
Replicas: 3,
}); err != nil {
t.Fatalf("Error adding stream: %v", err)
}

// We can't really check if the consumer ends-up with memory storage or not.
// We are simply going to create a NATS subscription on the request subject
// and make sure that the request contains "mem_storage:true".
sub, err := nc.SubscribeSync("$JS.API.CONSUMER.DURABLE.CREATE.CR.dur")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

ci, err := js.AddConsumer("CR", &nats.ConsumerConfig{
Durable: "dur",
DeliverSubject: "bar",
Replicas: 1,
MemoryStorage: true,
})
if err != nil {
t.Fatalf("Error adding consumer: %v", err)
}
if n := len(ci.Cluster.Replicas); n > 0 {
t.Fatalf("Expected replicas to be 1, got %+v", ci.Cluster)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on next msg: %v", err)
}
if str := string(msg.Data); !strings.Contains(str, "mem_storage\":true") {
t.Fatalf("Does not look like the request asked for memory storage: %s", str)
}
})
}

func TestJetStreamRePublish(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

if _, err := js.AddStream(&nats.StreamConfig{
Name: "RP",
Storage: nats.MemoryStorage,
Subjects: []string{"foo", "bar", "baz"},
RePublish: &nats.SubjectMapping{
Source: ">",
Destination: "RP.>",
},
}); err != nil {
t.Fatalf("Error adding stream: %v", err)
}

sub, err := nc.SubscribeSync("RP.>")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

msg, toSend := []byte("OK TO REPUBLISH?"), 100
for i := 0; i < toSend; i++ {
js.Publish("foo", msg)
js.Publish("bar", msg)
js.Publish("baz", msg)
}

lseq := map[string]int{
"foo": 0,
"bar": 0,
"baz": 0,
}

for i := 1; i <= toSend; i++ {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on next msg: %v", err)
}
// Grab info from Header
stream := m.Header.Get(nats.JSStream)
if stream != "RP" {
t.Fatalf("Unexpected header: %+v", m.Header)
}
// Make sure sequence is correct.
seq, err := strconv.Atoi(m.Header.Get(nats.JSSequence))
if err != nil {
t.Fatalf("Error decoding sequence for %s", m.Header.Get(nats.JSSequence))
}
if seq != i {
t.Fatalf("Expected sequence to be %v, got %v", i, seq)
}
// Make sure last sequence matches last seq we received on this subject.
last, err := strconv.Atoi(m.Header.Get(nats.JSLastSequence))
if err != nil {
t.Fatalf("Error decoding last sequence for %s", m.Header.Get(nats.JSLastSequence))
}
if last != lseq[m.Subject] {
t.Fatalf("Expected last sequence to be %v, got %v", lseq[m.Subject], last)
}
lseq[m.Subject] = seq
}
}

0 comments on commit 4e4f318

Please sign in to comment.