Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Stream RePublish and some ConsumerConfig new fields #980

Merged
merged 1 commit into from May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.8.2
github.com/nats-io/nats-server/v2 v2.8.4-0.20220524225320-752c0adec50d
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
6 changes: 3 additions & 3 deletions go_test.sum
Expand Up @@ -15,9 +15,9 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.8.2 h1:5m1VytMEbZx0YINvKY+X2gXdLNwP43uLXnFRwz8j8KE=
github.com/nats-io/nats-server/v2 v2.8.2/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4=
github.com/nats-io/nats.go v1.14.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats-server/v2 v2.8.4-0.20220524225320-752c0adec50d h1:tSc2SvfJE24lwT5750qLG3sW0ZkVhK1BjFlv7IfO1SY=
github.com/nats-io/nats-server/v2 v2.8.4-0.20220524225320-752c0adec50d/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
18 changes: 16 additions & 2 deletions js.go
Expand Up @@ -373,6 +373,13 @@ const (
MsgRollup = "Nats-Rollup"
)

// Headers for republished messages.
const (
JSStream = "Nats-Stream"
JSSequence = "Nats-Sequence"
JSLastSequence = "Nats-Last-Sequence"
)

// MsgSize is a header that will be part of a consumer's delivered message if HeadersOnly requested.
const MsgSize = "Nats-Msg-Size"

Expand Down Expand Up @@ -961,8 +968,6 @@ func (d nakDelay) configureAck(opts *ackOpts) error {
type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
Description string `json:"description,omitempty"`
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
Expand All @@ -984,8 +989,17 @@ type ConsumerConfig struct {
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`

// Push based consumers.
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`

// Ephemeral inactivity threshold.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`

// Generally inherited by parent stream and other markers, now can be configured directly.
Replicas int `json:"num_replicas"`
// Force memory storage.
MemoryStorage bool `json:"mem_storage,omitempty"`
}

// ConsumerInfo is the info from a JetStream consumer.
Expand Down
9 changes: 9 additions & 0 deletions jsm.go
Expand Up @@ -100,6 +100,15 @@ type StreamConfig struct {
DenyDelete bool `json:"deny_delete,omitempty"`
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`

// Allow republish of the message after being sequenced and stored.
RePublish *SubjectMapping `json:"republish,omitempty"`
}

// SubjectMapping allows a source subject to be mapped to a destination subject for republishing.
type SubjectMapping struct {
Source string `json:"src,omitempty"`
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
Destination string `json:"dest"`
}

// Placement is used to guide placement of streams in clustered JetStream.
Expand Down
111 changes: 111 additions & 0 deletions test/js_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"net"
"os"
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -6617,3 +6618,113 @@ func TestJetStreamClusterStreamLeaderChangeClientErr(t *testing.T) {
}
})
}

func TestJetStreamConsumerConfigReplicasAndMemStorage(t *testing.T) {
withJSCluster(t, "CR", 3, func(t *testing.T, nodes ...*jsServer) {
nc, js := jsClient(t, nodes[0].Server)
defer nc.Close()

if _, err := js.AddStream(&nats.StreamConfig{
Name: "CR",
Subjects: []string{"foo"},
Replicas: 3,
}); err != nil {
t.Fatalf("Error adding stream: %v", err)
}

// We can't really check if the consumer ends-up with memory storage or not.
// We are simply going to create a NATS subscription on the request subject
// and make sure that the request contains "mem_storage:true".
sub, err := nc.SubscribeSync("$JS.API.CONSUMER.DURABLE.CREATE.CR.dur")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

ci, err := js.AddConsumer("CR", &nats.ConsumerConfig{
Durable: "dur",
DeliverSubject: "bar",
Replicas: 1,
MemoryStorage: true,
})
if err != nil {
t.Fatalf("Error adding consumer: %v", err)
}
if n := len(ci.Cluster.Replicas); n > 0 {
t.Fatalf("Expected replicas to be 1, got %+v", ci.Cluster)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on next msg: %v", err)
}
if str := string(msg.Data); !strings.Contains(str, "mem_storage\":true") {
t.Fatalf("Does not look like the request asked for memory storage: %s", str)
}
})
}

func TestJetStreamRePublish(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

if _, err := js.AddStream(&nats.StreamConfig{
Name: "RP",
Storage: nats.MemoryStorage,
Subjects: []string{"foo", "bar", "baz"},
RePublish: &nats.SubjectMapping{
Source: ">",
Destination: "RP.>",
},
}); err != nil {
t.Fatalf("Error adding stream: %v", err)
}

sub, err := nc.SubscribeSync("RP.>")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

msg, toSend := []byte("OK TO REPUBLISH?"), 100
for i := 0; i < toSend; i++ {
js.Publish("foo", msg)
js.Publish("bar", msg)
js.Publish("baz", msg)
}

lseq := map[string]int{
"foo": 0,
"bar": 0,
"baz": 0,
}

for i := 1; i <= toSend; i++ {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on next msg: %v", err)
}
// Grab info from Header
stream := m.Header.Get(nats.JSStream)
if stream != "RP" {
t.Fatalf("Unexpected header: %+v", m.Header)
}
// Make sure sequence is correct.
seq, err := strconv.Atoi(m.Header.Get(nats.JSSequence))
if err != nil {
t.Fatalf("Error decoding sequence for %s", m.Header.Get(nats.JSSequence))
}
if seq != i {
t.Fatalf("Expected sequence to be %v, got %v", i, seq)
}
// Make sure last sequence matches last seq we received on this subject.
last, err := strconv.Atoi(m.Header.Get(nats.JSLastSequence))
if err != nil {
t.Fatalf("Error decoding last sequence for %s", m.Header.Get(nats.JSLastSequence))
}
if last != lseq[m.Subject] {
t.Fatalf("Expected last sequence to be %v, got %v", lseq[m.Subject], last)
}
lseq[m.Subject] = seq
}
}