Skip to content

Commit

Permalink
Merge pull request #3814 from nats-io/jnm/streamsourcetransform
Browse files Browse the repository at this point in the history
[ADDED] Stream subject mapping transforms
  • Loading branch information
derekcollison committed Jan 26, 2023
2 parents 7c0c85e + ddce3ce commit c2d3d9c
Show file tree
Hide file tree
Showing 9 changed files with 999 additions and 1,324 deletions.
516 changes: 12 additions & 504 deletions server/accounts.go

Large diffs are not rendered by default.

128 changes: 0 additions & 128 deletions server/accounts_test.go
Expand Up @@ -16,10 +16,8 @@ package server
import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net/http"
"reflect"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -48,60 +46,6 @@ func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) {
return s, f, b
}

func TestPlaceHolderIndex(t *testing.T) {
testString := "$1"
transformType, indexes, nbPartitions, _, err := indexPlaceHolders(testString)
var position int32

if err != nil || transformType != Wildcard || len(indexes) != 1 || indexes[0] != 1 || nbPartitions != -1 {
t.Fatalf("Error parsing %s", testString)
}

testString = "{{partition(10,1,2,3)}}"

transformType, indexes, nbPartitions, _, err = indexPlaceHolders(testString)

if err != nil || transformType != Partition || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 {
t.Fatalf("Error parsing %s", testString)
}

testString = "{{ Partition (10,1,2,3) }}"

transformType, indexes, nbPartitions, _, err = indexPlaceHolders(testString)

if err != nil || transformType != Partition || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 {
t.Fatalf("Error parsing %s", testString)
}

testString = "{{wildcard(2)}}"
transformType, indexes, nbPartitions, _, err = indexPlaceHolders(testString)

if err != nil || transformType != Wildcard || len(indexes) != 1 || indexes[0] != 2 || nbPartitions != -1 {
t.Fatalf("Error parsing %s", testString)
}

testString = "{{SplitFromLeft(2,1)}}"
transformType, indexes, position, _, err = indexPlaceHolders(testString)

if err != nil || transformType != SplitFromLeft || len(indexes) != 1 || indexes[0] != 2 || position != 1 {
t.Fatalf("Error parsing %s", testString)
}

testString = "{{SplitFromRight(3,2)}}"
transformType, indexes, position, _, err = indexPlaceHolders(testString)

if err != nil || transformType != SplitFromRight || len(indexes) != 1 || indexes[0] != 3 || position != 2 {
t.Fatalf("Error parsing %s", testString)
}

testString = "{{SliceFromLeft(2,2)}}"
transformType, indexes, sliceSize, _, err := indexPlaceHolders(testString)

if err != nil || transformType != SliceFromLeft || len(indexes) != 1 || indexes[0] != 2 || sliceSize != 2 {
t.Fatalf("Error parsing %s", testString)
}
}

func TestRegisterDuplicateAccounts(t *testing.T) {
s, _, _ := simpleAccountServer(t)
if _, err := s.RegisterAccount("$foo"); err == nil {
Expand Down Expand Up @@ -3264,78 +3208,6 @@ func TestSamplingHeader(t *testing.T) {
test(false, http.Header{"traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00"}})
}

func TestSubjectTransforms(t *testing.T) {
shouldErr := func(src, dest string) {
t.Helper()
if _, err := newTransform(src, dest); err != ErrBadSubject && !errors.Is(err, ErrInvalidMappingDestination) {
t.Fatalf("Did not get an error for src=%q and dest=%q", src, dest)
}
}

shouldErr("foo.*.*", "bar.$2") // Must place all pwcs.

// Must be valid subjects.
shouldErr("foo", "")
shouldErr("foo..", "bar")

// Wildcards are allowed in src, but must be matched by token placements on the other side.
// e.g. foo.* -> bar.$1.
// Need to have as many pwcs as placements on other side.
shouldErr("foo.*", "bar.*")
shouldErr("foo.*", "bar.$2") // Bad pwc token identifier
shouldErr("foo.*", "bar.$1.>") // fwcs have to match.
shouldErr("foo.>", "bar.baz") // fwcs have to match.
shouldErr("foo.*.*", "bar.$2") // Must place all pwcs.
shouldErr("foo.*", "foo.$foo") // invalid $ value
shouldErr("foo.*", "foo.{{wildcard(2)}}") // Mapping function being passed an out of range wildcard index
shouldErr("foo.*", "foo.{{unimplemented(1)}}") // Mapping trying to use an unknown mapping function
shouldErr("foo.*", "foo.{{partition(10)}}") // Not enough arguments passed to the mapping function
shouldErr("foo.*", "foo.{{wildcard(foo)}}") // Invalid argument passed to the mapping function
shouldErr("foo.*", "foo.{{wildcard()}}") // Not enough arguments passed to the mapping function
shouldErr("foo.*", "foo.{{wildcard(1,2)}}") // Too many arguments passed to the mapping function
shouldErr("foo.*", "foo.{{ wildcard5) }}") // Bad mapping function
shouldErr("foo.*", "foo.{{splitLeft(2,2}}") // arg out of range

shouldBeOK := func(src, dest string) *transform {
t.Helper()
tr, err := newTransform(src, dest)
if err != nil {
t.Fatalf("Got an error %v for src=%q and dest=%q", err, src, dest)
}
return tr
}

shouldBeOK("foo", "bar")
shouldBeOK("foo.*.bar.*.baz", "req.$2.$1")
shouldBeOK("baz.>", "mybaz.>")
shouldBeOK("*", "{{splitfromleft(1,1)}}")

shouldMatch := func(src, dest, sample, expected string) {
t.Helper()
tr := shouldBeOK(src, dest)
s, err := tr.Match(sample)
if err != nil {
t.Fatalf("Got an error %v when expecting a match for %q to %q", err, sample, expected)
}
if s != expected {
t.Fatalf("Dest does not match what was expected. Got %q, expected %q", s, expected)
}
}

shouldMatch("foo", "bar", "foo", "bar")
shouldMatch("foo.*.bar.*.baz", "req.$2.$1", "foo.A.bar.B.baz", "req.B.A")
shouldMatch("baz.>", "my.pre.>", "baz.1.2.3", "my.pre.1.2.3")
shouldMatch("baz.>", "foo.bar.>", "baz.1.2.3", "foo.bar.1.2.3")
shouldMatch("*", "foo.bar.$1", "foo", "foo.bar.foo")
shouldMatch("*", "{{splitfromleft(1,3)}}", "12345", "123.45")
shouldMatch("*", "{{SplitFromRight(1,3)}}", "12345", "12.345")
shouldMatch("*", "{{SliceFromLeft(1,3)}}", "1234567890", "123.456.789.0")
shouldMatch("*", "{{SliceFromRight(1,3)}}", "1234567890", "1.234.567.890")
shouldMatch("*", "{{split(1,-)}}", "-abc-def--ghi-", "abc.def.ghi")
shouldMatch("*", "{{split(1,-)}}", "abc-def--ghi-", "abc.def.ghi")
shouldMatch("*.*", "{{split(2,-)}}.{{splitfromleft(1,2)}}", "foo.-abc-def--ghij-", "abc.def.ghij.fo.o") // combo + checks split for multiple instance of deliminator and deliminator being at the start or end
}

func TestAccountSystemPermsWithGlobalAccess(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
Expand Down
14 changes: 6 additions & 8 deletions server/client.go
Expand Up @@ -2754,10 +2754,8 @@ func (c *client) addShadowSub(sub *subscription, ime *ime) (*subscription, error
if ime.overlapSubj != _EMPTY_ {
s = ime.overlapSubj
}
subj, err := im.rtr.transformSubject(s)
if err != nil {
return nil, err
}
subj := im.rtr.TransformSubject(s)

nsub.subject = []byte(subj)
} else if !im.usePub || (im.usePub && ime.overlapSubj != _EMPTY_) || !ime.dyn {
if ime.overlapSubj != _EMPTY_ {
Expand Down Expand Up @@ -3035,7 +3033,7 @@ func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, ac
// Remap subject if its a shadow subscription, treat like a normal client.
if rt.sub.im != nil {
if rt.sub.im.tr != nil {
to, _ := rt.sub.im.tr.transformSubject(string(subj))
to := rt.sub.im.tr.TransformSubject(string(subj))
subj = []byte(to)
} else if !rt.sub.im.usePub {
subj = []byte(rt.sub.im.to)
Expand Down Expand Up @@ -3991,7 +3989,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt

if si.tr != nil {
// FIXME(dlc) - This could be slow, may want to look at adding cache to bare transforms?
to, _ = si.tr.transformSubject(subject)
to = si.tr.TransformSubject(subject)
} else if si.usePub {
to = subject
}
Expand Down Expand Up @@ -4280,7 +4278,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}
if sub.im.tr != nil {
to, _ := sub.im.tr.transformSubject(string(subject))
to := sub.im.tr.TransformSubject(string(subject))
dsubj = append(_dsubj[:0], to...)
} else if sub.im.usePub {
dsubj = append(_dsubj[:0], subj...)
Expand Down Expand Up @@ -4427,7 +4425,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}
if sub.im.tr != nil {
to, _ := sub.im.tr.transformSubject(string(subject))
to := sub.im.tr.TransformSubject(string(subject))
dsubj = append(_dsubj[:0], to...)
} else if sub.im.usePub {
dsubj = append(_dsubj[:0], subj...)
Expand Down
33 changes: 2 additions & 31 deletions server/consumer.go
Expand Up @@ -466,17 +466,8 @@ func checkConsumerCfg(
}
}

// As best we can make sure the filtered subject is valid.
if config.FilterSubject != _EMPTY_ {
subjects := copyStrings(cfg.Subjects)
// explicitly skip validFilteredSubject when recovering
hasExt := isRecovering
if !isRecovering {
subjects, hasExt = gatherSourceMirrorSubjects(subjects, cfg, acc)
}
if !hasExt && !validFilteredSubject(config.FilterSubject, subjects) {
return NewJSConsumerFilterNotSubsetError()
}
if config.FilterSubject != _EMPTY_ && !IsValidSubject(config.FilterSubject) {
return NewJSStreamInvalidConfigError(ErrBadSubject)
}

// Helper function to formulate similar errors.
Expand Down Expand Up @@ -4272,26 +4263,6 @@ func deliveryFormsCycle(cfg *StreamConfig, deliverySubject string) bool {
return false
}

// Check that the filtered subject is valid given a set of stream subjects.
func validFilteredSubject(filteredSubject string, subjects []string) bool {
if !IsValidSubject(filteredSubject) {
return false
}
hasWC := subjectHasWildcard(filteredSubject)

for _, subject := range subjects {
if subjectIsSubsetMatch(filteredSubject, subject) {
return true
}
// If we have a wildcard as the filtered subject check to see if we are
// a wider scope but do match a subject.
if hasWC && subjectIsSubsetMatch(subject, filteredSubject) {
return true
}
}
return false
}

// switchToEphemeral is called on startup when recovering ephemerals.
func (o *consumer) switchToEphemeral() {
o.mu.Lock()
Expand Down
41 changes: 2 additions & 39 deletions server/jetstream_cluster_1_test.go
Expand Up @@ -5544,39 +5544,6 @@ func TestJetStreamClusterMirrorAndSourcesClusterRestart(t *testing.T) {
})
}

func TestJetStreamClusterSourceFilterSubjectUpdateFail(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSR", 3)
defer c.shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Origin
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 2,
})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "S",
Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "notthere"}},
Replicas: 2,
})
require_Error(t, err)
require_Equal(t, err.Error(), "nats: source 'TEST' filter subject 'notthere' does not overlap with any origin stream subject")

_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Mirror: &nats.StreamSource{Name: "TEST", FilterSubject: "notthere"},
Replicas: 2,
})
require_Error(t, err)
require_Equal(t, err.Error(), "nats: mirror 'TEST' filter subject 'notthere' does not overlap with any origin stream subject")
}

func TestJetStreamClusterMirrorAndSourcesFilteredConsumers(t *testing.T) {
c := createJetStreamClusterWithTemplate(t, jsClusterMirrorSourceImportsTempl, "MS5", 5)
defer c.shutdown()
Expand Down Expand Up @@ -5625,9 +5592,8 @@ func TestJetStreamClusterMirrorAndSourcesFilteredConsumers(t *testing.T) {
createConsumer("M", "foo")
createConsumer("M", "bar")
createConsumer("M", "baz.foo")
expectFail("M", "baz")
expectFail("M", "baz.1.2")
expectFail("M", "apple")
expectFail("M", ".")
expectFail("M", ">.foo")

// Make sure wider scoped subjects work as well.
createConsumer("M", "*")
Expand All @@ -5652,9 +5618,6 @@ func TestJetStreamClusterMirrorAndSourcesFilteredConsumers(t *testing.T) {

createConsumer("S", "foo.1")
createConsumer("S", "bar.1")
expectFail("S", "baz")
expectFail("S", "baz.1")
expectFail("S", "apple")

// Now cross account stuff.
nc2, js2 := jsClientConnect(t, s, nats.UserInfo("rip", "pass"))
Expand Down

0 comments on commit c2d3d9c

Please sign in to comment.