diff --git a/go_test.mod b/go_test.mod index 68d2ac483..20e3f0c97 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,7 +4,7 @@ go 1.17 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.8.2 + github.com/nats-io/nats-server/v2 v2.8.4-0.20220524225320-752c0adec50d github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go_test.sum b/go_test.sum index 8b79f477c..f14df0670 100644 --- a/go_test.sum +++ b/go_test.sum @@ -15,9 +15,9 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I= github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.8.2 h1:5m1VytMEbZx0YINvKY+X2gXdLNwP43uLXnFRwz8j8KE= -github.com/nats-io/nats-server/v2 v2.8.2/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4= -github.com/nats-io/nats.go v1.14.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats-server/v2 v2.8.4-0.20220524225320-752c0adec50d h1:tSc2SvfJE24lwT5750qLG3sW0ZkVhK1BjFlv7IfO1SY= +github.com/nats-io/nats-server/v2 v2.8.4-0.20220524225320-752c0adec50d/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= +github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/js.go b/js.go index 1259743ec..dafa53ab5 100644 --- a/js.go +++ b/js.go @@ -373,6 +373,13 @@ const ( MsgRollup = "Nats-Rollup" ) +// Headers for republished messages. +const ( + JSStream = "Nats-Stream" + JSSequence = "Nats-Sequence" + JSLastSequence = "Nats-Last-Sequence" +) + // MsgSize is a header that will be part of a consumer's delivered message if HeadersOnly requested. const MsgSize = "Nats-Msg-Size" @@ -961,8 +968,6 @@ func (d nakDelay) configureAck(opts *ackOpts) error { type ConsumerConfig struct { Durable string `json:"durable_name,omitempty"` Description string `json:"description,omitempty"` - DeliverSubject string `json:"deliver_subject,omitempty"` - DeliverGroup string `json:"deliver_group,omitempty"` DeliverPolicy DeliverPolicy `json:"deliver_policy"` OptStartSeq uint64 `json:"opt_start_seq,omitempty"` OptStartTime *time.Time `json:"opt_start_time,omitempty"` @@ -984,8 +989,17 @@ type ConsumerConfig struct { MaxRequestBatch int `json:"max_batch,omitempty"` MaxRequestExpires time.Duration `json:"max_expires,omitempty"` + // Push based consumers. + DeliverSubject string `json:"deliver_subject,omitempty"` + DeliverGroup string `json:"deliver_group,omitempty"` + // Ephemeral inactivity threshold. InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` + + // Generally inherited by parent stream and other markers, now can be configured directly. + Replicas int `json:"num_replicas"` + // Force memory storage. + MemoryStorage bool `json:"mem_storage,omitempty"` } // ConsumerInfo is the info from a JetStream consumer. diff --git a/jsm.go b/jsm.go index a8c560b65..6bfff6732 100644 --- a/jsm.go +++ b/jsm.go @@ -100,6 +100,15 @@ type StreamConfig struct { DenyDelete bool `json:"deny_delete,omitempty"` DenyPurge bool `json:"deny_purge,omitempty"` AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` + + // Allow republish of the message after being sequenced and stored. + RePublish *SubjectMapping `json:"republish,omitempty"` +} + +// SubjectMapping allows a source subject to be mapped to a destination subject for republishing. +type SubjectMapping struct { + Source string `json:"src,omitempty"` + Destination string `json:"dest"` } // Placement is used to guide placement of streams in clustered JetStream. diff --git a/test/js_test.go b/test/js_test.go index 01d6b8d55..1fb2a3074 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -23,6 +23,7 @@ import ( "net" "os" "reflect" + "strconv" "strings" "sync" "sync/atomic" @@ -6617,3 +6618,113 @@ func TestJetStreamClusterStreamLeaderChangeClientErr(t *testing.T) { } }) } + +func TestJetStreamConsumerConfigReplicasAndMemStorage(t *testing.T) { + withJSCluster(t, "CR", 3, func(t *testing.T, nodes ...*jsServer) { + nc, js := jsClient(t, nodes[0].Server) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{ + Name: "CR", + Subjects: []string{"foo"}, + Replicas: 3, + }); err != nil { + t.Fatalf("Error adding stream: %v", err) + } + + // We can't really check if the consumer ends-up with memory storage or not. + // We are simply going to create a NATS subscription on the request subject + // and make sure that the request contains "mem_storage:true". + sub, err := nc.SubscribeSync("$JS.API.CONSUMER.DURABLE.CREATE.CR.dur") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + ci, err := js.AddConsumer("CR", &nats.ConsumerConfig{ + Durable: "dur", + DeliverSubject: "bar", + Replicas: 1, + MemoryStorage: true, + }) + if err != nil { + t.Fatalf("Error adding consumer: %v", err) + } + if n := len(ci.Cluster.Replicas); n > 0 { + t.Fatalf("Expected replicas to be 1, got %+v", ci.Cluster) + } + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error on next msg: %v", err) + } + if str := string(msg.Data); !strings.Contains(str, "mem_storage\":true") { + t.Fatalf("Does not look like the request asked for memory storage: %s", str) + } + }) +} + +func TestJetStreamRePublish(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{ + Name: "RP", + Storage: nats.MemoryStorage, + Subjects: []string{"foo", "bar", "baz"}, + RePublish: &nats.SubjectMapping{ + Source: ">", + Destination: "RP.>", + }, + }); err != nil { + t.Fatalf("Error adding stream: %v", err) + } + + sub, err := nc.SubscribeSync("RP.>") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + msg, toSend := []byte("OK TO REPUBLISH?"), 100 + for i := 0; i < toSend; i++ { + js.Publish("foo", msg) + js.Publish("bar", msg) + js.Publish("baz", msg) + } + + lseq := map[string]int{ + "foo": 0, + "bar": 0, + "baz": 0, + } + + for i := 1; i <= toSend; i++ { + m, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error on next msg: %v", err) + } + // Grab info from Header + stream := m.Header.Get(nats.JSStream) + if stream != "RP" { + t.Fatalf("Unexpected header: %+v", m.Header) + } + // Make sure sequence is correct. + seq, err := strconv.Atoi(m.Header.Get(nats.JSSequence)) + if err != nil { + t.Fatalf("Error decoding sequence for %s", m.Header.Get(nats.JSSequence)) + } + if seq != i { + t.Fatalf("Expected sequence to be %v, got %v", i, seq) + } + // Make sure last sequence matches last seq we received on this subject. + last, err := strconv.Atoi(m.Header.Get(nats.JSLastSequence)) + if err != nil { + t.Fatalf("Error decoding last sequence for %s", m.Header.Get(nats.JSLastSequence)) + } + if last != lseq[m.Subject] { + t.Fatalf("Expected last sequence to be %v, got %v", lseq[m.Subject], last) + } + lseq[m.Subject] = seq + } +}