Skip to content

Commit

Permalink
Merge pull request #3823 from ripienaar/mappings_breaking_changes
Browse files Browse the repository at this point in the history
Mappings breaking changes
  • Loading branch information
derekcollison committed Jan 27, 2023
2 parents 97d788d + 348c84a commit b9f220e
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 18 deletions.
8 changes: 4 additions & 4 deletions server/accounts.go
Expand Up @@ -619,7 +619,7 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
if err != nil {
return err
}
tr, err := newSubjectTransform(src, d.Subject)
tr, err := NewSubjectTransform(src, d.Subject)
if err != nil {
return err
}
Expand Down Expand Up @@ -650,7 +650,7 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
// We need to make the appropriate markers for the wildcards etc.
dest = transformTokenize(dest)
}
tr, err := newSubjectTransform(src, dest)
tr, err := NewSubjectTransform(src, dest)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1828,7 +1828,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
} else {
to, _ = transformUntokenize(to)
// Create a transform. Do so in reverse such that $ symbols only exist in to
if tr, err = newSubjectTransform(to, transformTokenize(from)); err != nil {
if tr, err = NewSubjectTransform(to, transformTokenize(from)); err != nil {
a.mu.Unlock()
return nil, fmt.Errorf("failed to create mapping transform for service import subject %q to %q: %v",
from, to, err)
Expand Down Expand Up @@ -2375,7 +2375,7 @@ func (a *Account) AddMappedStreamImportWithClaim(account *Account, from, to stri
usePub = true
} else {
// Create a transform
if tr, err = newSubjectTransform(from, transformTokenize(to)); err != nil {
if tr, err = NewSubjectTransform(from, transformTokenize(to)); err != nil {
return fmt.Errorf("failed to create mapping transform for stream import subject %q to %q: %v",
from, to, err)
}
Expand Down
14 changes: 7 additions & 7 deletions server/stream.go
Expand Up @@ -430,7 +430,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
ssi.setIndexName(i)
// check the transform, if any, is valid
if ssi.SubjectTransformDest != _EMPTY_ {
if _, err = newSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform for the source not valid %w", err)
}
Expand Down Expand Up @@ -483,7 +483,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt

// Check for input subject transform
if cfg.SubjectTransform != nil {
tr, err := newSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
tr, err := NewSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
if err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream input subject transform not valid %w", err)
Expand All @@ -493,7 +493,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt

// Check for RePublish.
if cfg.RePublish != nil {
tr, err := newSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
tr, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
if err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream republish transform not valid %w", err)
Expand Down Expand Up @@ -1291,7 +1291,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if formsCycle {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish destination forms a cycle"))
}
if _, err := newSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination); err != nil {
if _, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination); err != nil {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish not valid"))
}
}
Expand Down Expand Up @@ -1522,7 +1522,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
// Check for transform.
if s.SubjectTransformDest != _EMPTY_ {
var err error
if si.tr, err = newSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
return err
}
}
Expand Down Expand Up @@ -1579,7 +1579,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
if cfg.RePublish.Source == _EMPTY_ {
cfg.RePublish.Source = fwcs
}
tr, err := newSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
tr, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
if err != nil {
jsa.mu.Unlock()
return fmt.Errorf("stream configuration for republish not valid")
Expand Down Expand Up @@ -2879,7 +2879,7 @@ func (mset *stream) startingSequenceForSources() {
si := &sourceInfo{name: ssi.Name, iname: ssi.iname, sf: ssi.FilterSubject}
// Check for transform.
if ssi.SubjectTransformDest != _EMPTY_ {
si.tr, _ = newSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest) // can not return an error because validated in AddStream
si.tr, _ = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest) // can not return an error because validated in AddStream
}

mset.sources[ssi.iname] = si
Expand Down
10 changes: 5 additions & 5 deletions server/subject_transform.go
Expand Up @@ -56,17 +56,17 @@ type subjectTransform struct {
dtokmfstringargs []string // destination token mapping function string arguments
}

// SubjectTransform transforms subjects using mappings
// SubjectTransformer transforms subjects using mappings
//
// This API is not part of the public API and not subject to SemVer protections
type SubjectTransform interface {
type SubjectTransformer interface {
// TODO(dlc) - We could add in client here to allow for things like foo -> foo.$ACCOUNT
Match(string) (string, error)
TransformSubject(subject string) string
TransformTokenizedSubject(tokens []string) string
}

func newSubjectTransform(src, dest string) (*subjectTransform, error) {
func NewSubjectTransform(src, dest string) (*subjectTransform, error) {
// No source given is equivalent to the source being ">"
if src == _EMPTY_ {
src = fwcs
Expand Down Expand Up @@ -493,7 +493,7 @@ func (tr *subjectTransform) TransformTokenizedSubject(tokens []string) string {
// Reverse a subjectTransform.
func (tr *subjectTransform) reverse() *subjectTransform {
if len(tr.dtokmftokindexesargs) == 0 {
rtr, _ := newSubjectTransform(tr.dest, tr.src)
rtr, _ := NewSubjectTransform(tr.dest, tr.src)
return rtr
}
// If we are here we need to dynamically get the correct reverse
Expand All @@ -513,6 +513,6 @@ func (tr *subjectTransform) reverse() *subjectTransform {
}
}
ndest := strings.Join(nda, tsep)
rtr, _ := newSubjectTransform(nsrc, ndest)
rtr, _ := NewSubjectTransform(nsrc, ndest)
return rtr
}
4 changes: 2 additions & 2 deletions server/subject_transform_test.go
Expand Up @@ -76,7 +76,7 @@ func TestPlaceHolderIndex(t *testing.T) {
func TestSubjectTransforms(t *testing.T) {
shouldErr := func(src, dest string) {
t.Helper()
if _, err := newSubjectTransform(src, dest); err != ErrBadSubject && !errors.Is(err, ErrInvalidMappingDestination) {
if _, err := NewSubjectTransform(src, dest); err != ErrBadSubject && !errors.Is(err, ErrInvalidMappingDestination) {
t.Fatalf("Did not get an error for src=%q and dest=%q", src, dest)
}
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestSubjectTransforms(t *testing.T) {

shouldBeOK := func(src, dest string) *subjectTransform {
t.Helper()
tr, err := newSubjectTransform(src, dest)
tr, err := NewSubjectTransform(src, dest)
if err != nil {
t.Fatalf("Got an error %v for src=%q and dest=%q", err, src, dest)
}
Expand Down

0 comments on commit b9f220e

Please sign in to comment.