Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Stream subject mapping transforms #3814

Merged
merged 4 commits into from Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
516 changes: 12 additions & 504 deletions server/accounts.go

Large diffs are not rendered by default.

128 changes: 0 additions & 128 deletions server/accounts_test.go
Expand Up @@ -16,10 +16,8 @@ package server
import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net/http"
"reflect"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -48,60 +46,6 @@ func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) {
return s, f, b
}

func TestPlaceHolderIndex(t *testing.T) {
testString := "$1"
transformType, indexes, nbPartitions, _, err := indexPlaceHolders(testString)
var position int32

if err != nil || transformType != Wildcard || len(indexes) != 1 || indexes[0] != 1 || nbPartitions != -1 {
t.Fatalf("Error parsing %s", testString)
}

testString = "{{partition(10,1,2,3)}}"

transformType, indexes, nbPartitions, _, err = indexPlaceHolders(testString)

if err != nil || transformType != Partition || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 {
t.Fatalf("Error parsing %s", testString)
}

testString = "{{ Partition (10,1,2,3) }}"

transformType, indexes, nbPartitions, _, err = indexPlaceHolders(testString)

if err != nil || transformType != Partition || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 {
t.Fatalf("Error parsing %s", testString)
}

testString = "{{wildcard(2)}}"
transformType, indexes, nbPartitions, _, err = indexPlaceHolders(testString)

if err != nil || transformType != Wildcard || len(indexes) != 1 || indexes[0] != 2 || nbPartitions != -1 {
t.Fatalf("Error parsing %s", testString)
}

testString = "{{SplitFromLeft(2,1)}}"
transformType, indexes, position, _, err = indexPlaceHolders(testString)

if err != nil || transformType != SplitFromLeft || len(indexes) != 1 || indexes[0] != 2 || position != 1 {
t.Fatalf("Error parsing %s", testString)
}

testString = "{{SplitFromRight(3,2)}}"
transformType, indexes, position, _, err = indexPlaceHolders(testString)

if err != nil || transformType != SplitFromRight || len(indexes) != 1 || indexes[0] != 3 || position != 2 {
t.Fatalf("Error parsing %s", testString)
}

testString = "{{SliceFromLeft(2,2)}}"
transformType, indexes, sliceSize, _, err := indexPlaceHolders(testString)

if err != nil || transformType != SliceFromLeft || len(indexes) != 1 || indexes[0] != 2 || sliceSize != 2 {
t.Fatalf("Error parsing %s", testString)
}
}

func TestRegisterDuplicateAccounts(t *testing.T) {
s, _, _ := simpleAccountServer(t)
if _, err := s.RegisterAccount("$foo"); err == nil {
Expand Down Expand Up @@ -3264,78 +3208,6 @@ func TestSamplingHeader(t *testing.T) {
test(false, http.Header{"traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00"}})
}

func TestSubjectTransforms(t *testing.T) {
shouldErr := func(src, dest string) {
t.Helper()
if _, err := newTransform(src, dest); err != ErrBadSubject && !errors.Is(err, ErrInvalidMappingDestination) {
t.Fatalf("Did not get an error for src=%q and dest=%q", src, dest)
}
}

shouldErr("foo.*.*", "bar.$2") // Must place all pwcs.

// Must be valid subjects.
shouldErr("foo", "")
shouldErr("foo..", "bar")

// Wildcards are allowed in src, but must be matched by token placements on the other side.
// e.g. foo.* -> bar.$1.
// Need to have as many pwcs as placements on other side.
shouldErr("foo.*", "bar.*")
shouldErr("foo.*", "bar.$2") // Bad pwc token identifier
shouldErr("foo.*", "bar.$1.>") // fwcs have to match.
shouldErr("foo.>", "bar.baz") // fwcs have to match.
shouldErr("foo.*.*", "bar.$2") // Must place all pwcs.
shouldErr("foo.*", "foo.$foo") // invalid $ value
shouldErr("foo.*", "foo.{{wildcard(2)}}") // Mapping function being passed an out of range wildcard index
shouldErr("foo.*", "foo.{{unimplemented(1)}}") // Mapping trying to use an unknown mapping function
shouldErr("foo.*", "foo.{{partition(10)}}") // Not enough arguments passed to the mapping function
shouldErr("foo.*", "foo.{{wildcard(foo)}}") // Invalid argument passed to the mapping function
shouldErr("foo.*", "foo.{{wildcard()}}") // Not enough arguments passed to the mapping function
shouldErr("foo.*", "foo.{{wildcard(1,2)}}") // Too many arguments passed to the mapping function
shouldErr("foo.*", "foo.{{ wildcard5) }}") // Bad mapping function
shouldErr("foo.*", "foo.{{splitLeft(2,2}}") // arg out of range

shouldBeOK := func(src, dest string) *transform {
t.Helper()
tr, err := newTransform(src, dest)
if err != nil {
t.Fatalf("Got an error %v for src=%q and dest=%q", err, src, dest)
}
return tr
}

shouldBeOK("foo", "bar")
shouldBeOK("foo.*.bar.*.baz", "req.$2.$1")
shouldBeOK("baz.>", "mybaz.>")
shouldBeOK("*", "{{splitfromleft(1,1)}}")

shouldMatch := func(src, dest, sample, expected string) {
t.Helper()
tr := shouldBeOK(src, dest)
s, err := tr.Match(sample)
if err != nil {
t.Fatalf("Got an error %v when expecting a match for %q to %q", err, sample, expected)
}
if s != expected {
t.Fatalf("Dest does not match what was expected. Got %q, expected %q", s, expected)
}
}

shouldMatch("foo", "bar", "foo", "bar")
shouldMatch("foo.*.bar.*.baz", "req.$2.$1", "foo.A.bar.B.baz", "req.B.A")
shouldMatch("baz.>", "my.pre.>", "baz.1.2.3", "my.pre.1.2.3")
shouldMatch("baz.>", "foo.bar.>", "baz.1.2.3", "foo.bar.1.2.3")
shouldMatch("*", "foo.bar.$1", "foo", "foo.bar.foo")
shouldMatch("*", "{{splitfromleft(1,3)}}", "12345", "123.45")
shouldMatch("*", "{{SplitFromRight(1,3)}}", "12345", "12.345")
shouldMatch("*", "{{SliceFromLeft(1,3)}}", "1234567890", "123.456.789.0")
shouldMatch("*", "{{SliceFromRight(1,3)}}", "1234567890", "1.234.567.890")
shouldMatch("*", "{{split(1,-)}}", "-abc-def--ghi-", "abc.def.ghi")
shouldMatch("*", "{{split(1,-)}}", "abc-def--ghi-", "abc.def.ghi")
shouldMatch("*.*", "{{split(2,-)}}.{{splitfromleft(1,2)}}", "foo.-abc-def--ghij-", "abc.def.ghij.fo.o") // combo + checks split for multiple instance of deliminator and deliminator being at the start or end
}

func TestAccountSystemPermsWithGlobalAccess(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
Expand Down
14 changes: 6 additions & 8 deletions server/client.go
Expand Up @@ -2754,10 +2754,8 @@ func (c *client) addShadowSub(sub *subscription, ime *ime) (*subscription, error
if ime.overlapSubj != _EMPTY_ {
s = ime.overlapSubj
}
subj, err := im.rtr.transformSubject(s)
if err != nil {
return nil, err
}
subj := im.rtr.TransformSubject(s)

nsub.subject = []byte(subj)
} else if !im.usePub || (im.usePub && ime.overlapSubj != _EMPTY_) || !ime.dyn {
if ime.overlapSubj != _EMPTY_ {
Expand Down Expand Up @@ -3035,7 +3033,7 @@ func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, ac
// Remap subject if its a shadow subscription, treat like a normal client.
if rt.sub.im != nil {
if rt.sub.im.tr != nil {
to, _ := rt.sub.im.tr.transformSubject(string(subj))
to := rt.sub.im.tr.TransformSubject(string(subj))
subj = []byte(to)
} else if !rt.sub.im.usePub {
subj = []byte(rt.sub.im.to)
Expand Down Expand Up @@ -3991,7 +3989,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt

if si.tr != nil {
// FIXME(dlc) - This could be slow, may want to look at adding cache to bare transforms?
to, _ = si.tr.transformSubject(subject)
to = si.tr.TransformSubject(subject)
} else if si.usePub {
to = subject
}
Expand Down Expand Up @@ -4280,7 +4278,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}
if sub.im.tr != nil {
to, _ := sub.im.tr.transformSubject(string(subject))
to := sub.im.tr.TransformSubject(string(subject))
dsubj = append(_dsubj[:0], to...)
} else if sub.im.usePub {
dsubj = append(_dsubj[:0], subj...)
Expand Down Expand Up @@ -4427,7 +4425,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}
if sub.im.tr != nil {
to, _ := sub.im.tr.transformSubject(string(subject))
to := sub.im.tr.TransformSubject(string(subject))
dsubj = append(_dsubj[:0], to...)
} else if sub.im.usePub {
dsubj = append(_dsubj[:0], subj...)
Expand Down
33 changes: 2 additions & 31 deletions server/consumer.go
Expand Up @@ -456,17 +456,8 @@ func checkConsumerCfg(
}
}

// As best we can make sure the filtered subject is valid.
if config.FilterSubject != _EMPTY_ {
subjects := copyStrings(cfg.Subjects)
// explicitly skip validFilteredSubject when recovering
hasExt := isRecovering
if !isRecovering {
subjects, hasExt = gatherSourceMirrorSubjects(subjects, cfg, acc)
}
if !hasExt && !validFilteredSubject(config.FilterSubject, subjects) {
return NewJSConsumerFilterNotSubsetError()
}
if config.FilterSubject != _EMPTY_ && !IsValidSubject(config.FilterSubject) {
return NewJSStreamInvalidConfigError(ErrBadSubject)
}

// Helper function to formulate similar errors.
Expand Down Expand Up @@ -4262,26 +4253,6 @@ func deliveryFormsCycle(cfg *StreamConfig, deliverySubject string) bool {
return false
}

// Check that the filtered subject is valid given a set of stream subjects.
func validFilteredSubject(filteredSubject string, subjects []string) bool {
if !IsValidSubject(filteredSubject) {
return false
}
hasWC := subjectHasWildcard(filteredSubject)

for _, subject := range subjects {
if subjectIsSubsetMatch(filteredSubject, subject) {
return true
}
// If we have a wildcard as the filtered subject check to see if we are
// a wider scope but do match a subject.
if hasWC && subjectIsSubsetMatch(subject, filteredSubject) {
return true
}
}
return false
}

// switchToEphemeral is called on startup when recovering ephemerals.
func (o *consumer) switchToEphemeral() {
o.mu.Lock()
Expand Down
41 changes: 2 additions & 39 deletions server/jetstream_cluster_1_test.go
Expand Up @@ -5544,39 +5544,6 @@ func TestJetStreamClusterMirrorAndSourcesClusterRestart(t *testing.T) {
})
}

func TestJetStreamClusterSourceFilterSubjectUpdateFail(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSR", 3)
defer c.shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Origin
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 2,
})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "S",
Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "notthere"}},
Replicas: 2,
})
require_Error(t, err)
require_Equal(t, err.Error(), "nats: source 'TEST' filter subject 'notthere' does not overlap with any origin stream subject")

_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Mirror: &nats.StreamSource{Name: "TEST", FilterSubject: "notthere"},
Replicas: 2,
})
require_Error(t, err)
require_Equal(t, err.Error(), "nats: mirror 'TEST' filter subject 'notthere' does not overlap with any origin stream subject")
}

func TestJetStreamClusterMirrorAndSourcesFilteredConsumers(t *testing.T) {
c := createJetStreamClusterWithTemplate(t, jsClusterMirrorSourceImportsTempl, "MS5", 5)
defer c.shutdown()
Expand Down Expand Up @@ -5625,9 +5592,8 @@ func TestJetStreamClusterMirrorAndSourcesFilteredConsumers(t *testing.T) {
createConsumer("M", "foo")
createConsumer("M", "bar")
createConsumer("M", "baz.foo")
expectFail("M", "baz")
expectFail("M", "baz.1.2")
expectFail("M", "apple")
expectFail("M", ".")
expectFail("M", ">.foo")

// Make sure wider scoped subjects work as well.
createConsumer("M", "*")
Expand All @@ -5652,9 +5618,6 @@ func TestJetStreamClusterMirrorAndSourcesFilteredConsumers(t *testing.T) {

createConsumer("S", "foo.1")
createConsumer("S", "bar.1")
expectFail("S", "baz")
expectFail("S", "baz.1")
expectFail("S", "apple")

// Now cross account stuff.
nc2, js2 := jsClientConnect(t, s, nats.UserInfo("rip", "pass"))
Expand Down