Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Ability to drop partial wildcard tokens in some subject transforms #4152

Merged
merged 4 commits into from May 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions server/accounts.go
Expand Up @@ -1903,7 +1903,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
} else {
to, _ = transformUntokenize(to)
// Create a transform. Do so in reverse such that $ symbols only exist in to
if tr, err = NewSubjectTransform(to, transformTokenize(from)); err != nil {
if tr, err = NewSubjectTransformStrict(to, transformTokenize(from)); err != nil {
a.mu.Unlock()
return nil, fmt.Errorf("failed to create mapping transform for service import subject from %q to %q: %v",
from, to, err)
Expand Down Expand Up @@ -2256,7 +2256,7 @@ func (si *serviceImport) isRespServiceImport() bool {
return si != nil && si.response
}

// Sets the response theshold timer for a service export.
// Sets the response threshold timer for a service export.
// Account lock should be held
func (se *serviceExport) setResponseThresholdTimer() {
if se.rtmr != nil {
Expand Down Expand Up @@ -2459,7 +2459,7 @@ func (a *Account) AddMappedStreamImportWithClaim(account *Account, from, to stri
usePub = true
} else {
// Create a transform
if tr, err = NewSubjectTransform(from, transformTokenize(to)); err != nil {
if tr, err = NewSubjectTransformStrict(from, transformTokenize(to)); err != nil {
return fmt.Errorf("failed to create mapping transform for stream import subject from %q to %q: %v",
from, to, err)
}
Expand Down
3 changes: 3 additions & 0 deletions server/errors.go
Expand Up @@ -219,6 +219,9 @@ var (

// ErrorMappingDestinationFunctionTooManyArguments is returned when the mapping destination function is passed too many arguments
ErrorMappingDestinationFunctionTooManyArguments = fmt.Errorf("%w: too many arguments passed to the function", ErrInvalidMappingDestination)

// ErrorMappingDestinationFunctionNotSupportedForImport is returned when you try to use a mapping function other than wildcard in a transform that needs to be reversible (i.e. an import)
ErrorMappingDestinationFunctionNotSupportedForImport = fmt.Errorf("%w: the only mapping function allowed for import transforms is {{Wildcard()}}", ErrInvalidMappingDestination)
)

// mappingDestinationErr is a type of subject mapping destination error
Expand Down
31 changes: 24 additions & 7 deletions server/subject_transform.go
Expand Up @@ -69,7 +69,9 @@ type SubjectTransformer interface {
TransformTokenizedSubject(tokens []string) string
}

func NewSubjectTransform(src, dest string) (*subjectTransform, error) {
func NewSubjectTransformWithStrict(src, dest string, strict bool) (*subjectTransform, error) {
// strict = true for import subject mappings that need to be reversible
// (meaning can only use the Wildcard function and must use all the pwcs that are present in the source)
// No source given is equivalent to the source being ">"
if src == _EMPTY_ {
src = fwcs
Expand Down Expand Up @@ -106,6 +108,12 @@ func NewSubjectTransform(src, dest string) (*subjectTransform, error) {
return nil, err
}

if strict {
if tranformType != NoTransform && tranformType != Wildcard {
return nil, &mappingDestinationErr{token, ErrorMappingDestinationFunctionNotSupportedForImport}
}
}

if tranformType == NoTransform {
dtokMappingFunctionTypes = append(dtokMappingFunctionTypes, NoTransform)
dtokMappingFunctionTokenIndexes = append(dtokMappingFunctionTokenIndexes, []int{-1})
Expand All @@ -128,7 +136,7 @@ func NewSubjectTransform(src, dest string) (*subjectTransform, error) {

}
}
if nphs < npwcs {
if strict && nphs < npwcs {
// not all wildcards are being used in the destination
return nil, &mappingDestinationErr{dest, ErrMappingDestinationNotUsingAllWildcards}
}
Expand All @@ -146,6 +154,14 @@ func NewSubjectTransform(src, dest string) (*subjectTransform, error) {
}, nil
}

func NewSubjectTransform(src, dest string) (*subjectTransform, error) {
return NewSubjectTransformWithStrict(src, dest, false)
}

func NewSubjectTransformStrict(src, dest string) (*subjectTransform, error) {
return NewSubjectTransformWithStrict(src, dest, true)
}

func getMappingFunctionArgs(functionRegEx *regexp.Regexp, token string) []string {
commandStrings := functionRegEx.FindStringSubmatch(token)
if len(commandStrings) > 1 {
Expand Down Expand Up @@ -304,14 +320,15 @@ func transformTokenize(subject string) string {
// Helper function to go from transform destination to a subject with partial wildcards and ordered list of placeholders
// E.g.:
//
// "bar" -> "bar", []
// "foo.$2.$1" -> "foo.*.*", ["$2","$1"]
// "bar" -> "bar", []
// "foo.$2.$1" -> "foo.*.*", ["$2","$1"]
// "foo.{{wildcard(2)}}.{{wildcard(1)}}" -> "foo.*.*", ["{{wildcard(2)}}","{{wildcard(1)}}"]
func transformUntokenize(subject string) (string, []string) {
var phs []string
var nda []string

for _, token := range strings.Split(subject, tsep) {
if len(token) > 1 && token[0] == '$' && token[1] >= '1' && token[1] <= '9' {
if args := getMappingFunctionArgs(wildcardMappingFunctionRegEx, token); (len(token) > 1 && token[0] == '$' && token[1] >= '1' && token[1] <= '9') || (len(args) == 1 && args[0] != _EMPTY_) {
phs = append(phs, token)
nda = append(nda, pwcs)
} else {
Expand Down Expand Up @@ -496,7 +513,7 @@ func (tr *subjectTransform) TransformTokenizedSubject(tokens []string) string {
// Reverse a subjectTransform.
func (tr *subjectTransform) reverse() *subjectTransform {
if len(tr.dtokmftokindexesargs) == 0 {
rtr, _ := NewSubjectTransform(tr.dest, tr.src)
rtr, _ := NewSubjectTransformStrict(tr.dest, tr.src)
return rtr
}
// If we are here we need to dynamically get the correct reverse
Expand All @@ -516,6 +533,6 @@ func (tr *subjectTransform) reverse() *subjectTransform {
}
}
ndest := strings.Join(nda, tsep)
rtr, _ := NewSubjectTransform(nsrc, ndest)
rtr, _ := NewSubjectTransformStrict(nsrc, ndest)
return rtr
}
114 changes: 82 additions & 32 deletions server/subject_transform_test.go
Expand Up @@ -73,58 +73,107 @@ func TestPlaceHolderIndex(t *testing.T) {
}
}

func TestSubjectTransformHelpers(t *testing.T) {
equals := func(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}

filter, placeHolders := transformUntokenize("bar")
if filter != "bar" || len(placeHolders) != 0 {
t.Fatalf("transformUntokenize for not returning expected result")
}

filter, placeHolders = transformUntokenize("foo.$2.$1")
if filter != "foo.*.*" || !equals(placeHolders, []string{"$2", "$1"}) {
t.Fatalf("transformUntokenize for not returning expected result")
}

filter, placeHolders = transformUntokenize("foo.{{wildcard(2)}}.{{wildcard(1)}}")
if filter != "foo.*.*" || !equals(placeHolders, []string{"{{wildcard(2)}}", "{{wildcard(1)}}"}) {
t.Fatalf("transformUntokenize for not returning expected result")
}

newReversibleTransform := func(src, dest string) *subjectTransform {
tr, err := NewSubjectTransformStrict(src, dest)
if err != nil {
t.Fatalf("Error getting reversible transform: %s to %s", src, dest)
}
return tr
}

tr := newReversibleTransform("foo.*.*", "bar.$2.{{Wildcard(1)}}")
subject := "foo.b.a"
transformed := tr.TransformSubject(subject)
reverse := tr.reverse()
if reverse.TransformSubject(transformed) != subject {
t.Fatal("Reversed transform subject not matching")
}
}

func TestSubjectTransforms(t *testing.T) {
shouldErr := func(src, dest string) {
shouldErr := func(src, dest string, strict bool) {
t.Helper()
if _, err := NewSubjectTransform(src, dest); err != ErrBadSubject && !errors.Is(err, ErrInvalidMappingDestination) {
if _, err := NewSubjectTransformWithStrict(src, dest, strict); err != ErrBadSubject && !errors.Is(err, ErrInvalidMappingDestination) {
t.Fatalf("Did not get an error for src=%q and dest=%q", src, dest)
}
}

shouldErr("foo.*.*", "bar.$2") // Must place all pwcs.

// Must be valid subjects.
shouldErr("foo", "")
shouldErr("foo..", "bar")
shouldErr("foo", "", false)
shouldErr("foo..", "bar", false)

// Wildcards are allowed in src, but must be matched by token placements on the other side.
// e.g. foo.* -> bar.$1.
// Need to have as many pwcs as placements on other side.
shouldErr("foo.*", "bar.*")
shouldErr("foo.*", "bar.$2") // Bad pwc token identifier
shouldErr("foo.*", "bar.$1.>") // fwcs have to match.
shouldErr("foo.>", "bar.baz") // fwcs have to match.
shouldErr("foo.*.*", "bar.$2") // Must place all pwcs.
shouldErr("foo.*", "foo.$foo") // invalid $ value
shouldErr("foo.*", "foo.{{wildcard(2)}}") // Mapping function being passed an out of range wildcard index
shouldErr("foo.*", "foo.{{unimplemented(1)}}") // Mapping trying to use an unknown mapping function
shouldErr("foo.*", "foo.{{partition(10)}}") // Not enough arguments passed to the mapping function
shouldErr("foo.*", "foo.{{wildcard(foo)}}") // Invalid argument passed to the mapping function
shouldErr("foo.*", "foo.{{wildcard()}}") // Not enough arguments passed to the mapping function
shouldErr("foo.*", "foo.{{wildcard(1,2)}}") // Too many arguments passed to the mapping function
shouldErr("foo.*", "foo.{{ wildcard5) }}") // Bad mapping function
shouldErr("foo.*", "foo.{{splitLeft(2,2}}") // arg out of range

shouldBeOK := func(src, dest string) *subjectTransform {
// Need to have as many pwcs as placements on other side

shouldErr("foo.*", "bar.*", false)
shouldErr("foo.*", "bar.$2", false) // Bad pwc token identifier
shouldErr("foo.*", "bar.$1.>", false) // fwcs have to match.
shouldErr("foo.>", "bar.baz", false) // fwcs have to match.
shouldErr("foo.*.*", "bar.$2", true) // Must place all pwcs.
shouldErr("foo.*", "foo.$foo", true) // invalid $ value
shouldErr("foo.*", "bar.{{Partition(2,1)}}", true) // can only use Wildcard function (and old-style $x) in import transform
shouldErr("foo.*", "foo.{{wildcard(2)}}", false) // Mapping function being passed an out of range wildcard index
shouldErr("foo.*", "foo.{{unimplemented(1)}}", false) // Mapping trying to use an unknown mapping function
shouldErr("foo.*", "foo.{{partition(10)}}", false) // Not enough arguments passed to the mapping function
shouldErr("foo.*", "foo.{{wildcard(foo)}}", false) // Invalid argument passed to the mapping function
shouldErr("foo.*", "foo.{{wildcard()}}", false) // Not enough arguments passed to the mapping function
shouldErr("foo.*", "foo.{{wildcard(1,2)}}", false) // Too many arguments passed to the mapping function
shouldErr("foo.*", "foo.{{ wildcard5) }}", false) // Bad mapping function
shouldErr("foo.*", "foo.{{splitLeft(2,2}}", false) // arg out of range

shouldBeOK := func(src, dest string, strict bool) *subjectTransform {
t.Helper()
tr, err := NewSubjectTransform(src, dest)
tr, err := NewSubjectTransformWithStrict(src, dest, strict)
if err != nil {
t.Fatalf("Got an error %v for src=%q and dest=%q", err, src, dest)
}
return tr
}

shouldBeOK("foo", "bar")
shouldBeOK("foo.*.bar.*.baz", "req.$2.$1")
shouldBeOK("baz.>", "mybaz.>")
shouldBeOK("*", "{{splitfromleft(1,1)}}")
shouldBeOK("", "prefix.>")
shouldBeOK("*.*", "{{partition(10,1,2)}}")
shouldBeOK("foo.*.*", "foo.{{wildcard(1)}}.{{wildcard(2)}}.{{partition(5,1,2)}}")
shouldBeOK("foo.*", "bar.{{Wildcard(1)}}", true)

shouldBeOK("foo.*.*", "bar.$2", false) // don't have to use all pwcs.
shouldBeOK("foo.*.*", "bar.{{wildcard(1)}}", false) // don't have to use all pwcs.
shouldBeOK("foo", "bar", false)
shouldBeOK("foo.*.bar.*.baz", "req.$2.$1", false)
shouldBeOK("baz.>", "mybaz.>", false)
shouldBeOK("*", "{{splitfromleft(1,1)}}", false)
shouldBeOK("", "prefix.>", false)
shouldBeOK("*.*", "{{partition(10,1,2)}}", false)
shouldBeOK("foo.*.*", "foo.{{wildcard(1)}}.{{wildcard(2)}}.{{partition(5,1,2)}}", false)

shouldMatch := func(src, dest, sample, expected string) {
t.Helper()
tr := shouldBeOK(src, dest)
tr := shouldBeOK(src, dest, false)
s, err := tr.Match(sample)
if err != nil {
t.Fatalf("Got an error %v when expecting a match for %q to %q", err, sample, expected)
Expand All @@ -137,6 +186,7 @@ func TestSubjectTransforms(t *testing.T) {
shouldMatch("", "prefix.>", "foo", "prefix.foo")
shouldMatch("foo", "bar", "foo", "bar")
shouldMatch("foo.*.bar.*.baz", "req.$2.$1", "foo.A.bar.B.baz", "req.B.A")
shouldMatch("foo.*.bar.*.baz", "req.{{wildcard(2)}}.{{wildcard(1)}}", "foo.A.bar.B.baz", "req.B.A")
shouldMatch("baz.>", "my.pre.>", "baz.1.2.3", "my.pre.1.2.3")
shouldMatch("baz.>", "foo.bar.>", "baz.1.2.3", "foo.bar.1.2.3")
shouldMatch("*", "foo.bar.$1", "foo", "foo.bar.foo")
Expand Down
44 changes: 12 additions & 32 deletions test/accounts_cycles_test.go
Expand Up @@ -415,15 +415,15 @@ func TestAccountSubjectMapping(t *testing.T) {
}
}

// test token and partition subject mapping within an account
// test token subject mapping within an account
// Alice imports from Bob with subject mapping
func TestAccountImportSubjectMapping(t *testing.T) {
conf := createConfFile(t, []byte(`
port: -1
accounts {
A {
users: [{user: a, pass: x}]
imports [ {stream: {account: B, subject: "foo.*.*"}, to : "foo.$1.{{wildcard(2)}}.{{partition(10,1,2)}}"}]
imports [ {stream: {account: B, subject: "foo.*.*"}, to : "foo.$1.{{wildcard(2)}}"}]
}
B {
users: [{user: b, pass x}]
Expand All @@ -442,57 +442,37 @@ func TestAccountImportSubjectMapping(t *testing.T) {
subjectsReceived := make(chan string)

msg := []byte("HELLO")
sub1, err := ncA.Subscribe("foo.*.*.*", func(m *nats.Msg) {
sub1, err := ncA.Subscribe("foo.*.*", func(m *nats.Msg) {
subjectsReceived <- m.Subject
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sub1.AutoUnsubscribe(numMessages * 2)
sub1.AutoUnsubscribe(numMessages)

ncB := clientConnectToServerWithUP(t, opts, "b", "x")
defer ncB.Close()

// publish numMessages with an increasing id (should map to partition numbers with the range of 10 partitions) - twice
for j := 0; j < 2; j++ {
for i := 0; i < numMessages; i++ {
err = ncB.Publish(fmt.Sprintf("foo.%d.%d", i, numMessages-i), msg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
}

// verify all the partition numbers are in the expected range
partitionsReceived := make([]int, numMessages)
// publish numMessages with an increasing id

for i := 0; i < numMessages; i++ {
subject := <-subjectsReceived
sTokens := strings.Split(subject, ".")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
t1, _ := strconv.Atoi(sTokens[1])
t2, _ := strconv.Atoi(sTokens[2])
partitionsReceived[i], err = strconv.Atoi(sTokens[3])
err = ncB.Publish(fmt.Sprintf("foo.%d.%d", i, numMessages-i), msg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if partitionsReceived[i] > 9 || partitionsReceived[i] < 0 || t1 != i || t2 != numMessages-i {
t.Fatalf("Error received unexpected %d.%d to partition %d", t1, t2, partitionsReceived[i])
}
}

// verify hashing is deterministic by checking it produces the same exact result twice
for i := 0; i < numMessages; i++ {
subject := <-subjectsReceived
partitionNumber, err := strconv.Atoi(strings.Split(subject, ".")[3])
sTokens := strings.Split(subject, ".")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if partitionsReceived[i] != partitionNumber {
t.Fatalf("Error: same id mapped to two different partitions")
t1, _ := strconv.Atoi(sTokens[1])
t2, _ := strconv.Atoi(sTokens[2])

if t1 != i || t2 != numMessages-i {
t.Fatalf("Error received unexpected %d.%d", t1, t2)
}
}
}
Expand Down