Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mappings breaking changes #3823

Merged
merged 1 commit into from Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions server/accounts.go
Expand Up @@ -619,7 +619,7 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
if err != nil {
return err
}
tr, err := newSubjectTransform(src, d.Subject)
tr, err := NewSubjectTransform(src, d.Subject)
if err != nil {
return err
}
Expand Down Expand Up @@ -650,7 +650,7 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
// We need to make the appropriate markers for the wildcards etc.
dest = transformTokenize(dest)
}
tr, err := newSubjectTransform(src, dest)
tr, err := NewSubjectTransform(src, dest)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1828,7 +1828,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
} else {
to, _ = transformUntokenize(to)
// Create a transform. Do so in reverse such that $ symbols only exist in to
if tr, err = newSubjectTransform(to, transformTokenize(from)); err != nil {
if tr, err = NewSubjectTransform(to, transformTokenize(from)); err != nil {
a.mu.Unlock()
return nil, fmt.Errorf("failed to create mapping transform for service import subject %q to %q: %v",
from, to, err)
Expand Down Expand Up @@ -2375,7 +2375,7 @@ func (a *Account) AddMappedStreamImportWithClaim(account *Account, from, to stri
usePub = true
} else {
// Create a transform
if tr, err = newSubjectTransform(from, transformTokenize(to)); err != nil {
if tr, err = NewSubjectTransform(from, transformTokenize(to)); err != nil {
return fmt.Errorf("failed to create mapping transform for stream import subject %q to %q: %v",
from, to, err)
}
Expand Down
14 changes: 7 additions & 7 deletions server/stream.go
Expand Up @@ -427,7 +427,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
ssi.setIndexName(i)
// check the transform, if any, is valid
if ssi.SubjectTransformDest != _EMPTY_ {
if _, err = newSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform for the source not valid %w", err)
}
Expand Down Expand Up @@ -480,7 +480,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt

// Check for input subject transform
if cfg.SubjectTransform != nil {
tr, err := newSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
tr, err := NewSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
if err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream input subject transform not valid %w", err)
Expand All @@ -490,7 +490,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt

// Check for RePublish.
if cfg.RePublish != nil {
tr, err := newSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
tr, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
if err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream republish transform not valid %w", err)
Expand Down Expand Up @@ -1280,7 +1280,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if formsCycle {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish destination forms a cycle"))
}
if _, err := newSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination); err != nil {
if _, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination); err != nil {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish not valid"))
}
}
Expand Down Expand Up @@ -1511,7 +1511,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
// Check for transform.
if s.SubjectTransformDest != _EMPTY_ {
var err error
if si.tr, err = newSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
return err
}
}
Expand Down Expand Up @@ -1568,7 +1568,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
if cfg.RePublish.Source == _EMPTY_ {
cfg.RePublish.Source = fwcs
}
tr, err := newSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
tr, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
if err != nil {
jsa.mu.Unlock()
return fmt.Errorf("stream configuration for republish not valid")
Expand Down Expand Up @@ -2868,7 +2868,7 @@ func (mset *stream) startingSequenceForSources() {
si := &sourceInfo{name: ssi.Name, iname: ssi.iname, sf: ssi.FilterSubject}
// Check for transform.
if ssi.SubjectTransformDest != _EMPTY_ {
si.tr, _ = newSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest) // can not return an error because validated in AddStream
si.tr, _ = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest) // can not return an error because validated in AddStream
}

mset.sources[ssi.iname] = si
Expand Down
10 changes: 5 additions & 5 deletions server/subject_transform.go
Expand Up @@ -56,17 +56,17 @@ type subjectTransform struct {
dtokmfstringargs []string // destination token mapping function string arguments
}

// SubjectTransform transforms subjects using mappings
// SubjectTransformer transforms subjects using mappings
//
// This API is not part of the public API and not subject to SemVer protections
type SubjectTransform interface {
type SubjectTransformer interface {
// TODO(dlc) - We could add in client here to allow for things like foo -> foo.$ACCOUNT
Match(string) (string, error)
TransformSubject(subject string) string
TransformTokenizedSubject(tokens []string) string
}

func newSubjectTransform(src, dest string) (*subjectTransform, error) {
func NewSubjectTransform(src, dest string) (*subjectTransform, error) {
// No source given is equivalent to the source being ">"
if src == _EMPTY_ {
src = fwcs
Expand Down Expand Up @@ -493,7 +493,7 @@ func (tr *subjectTransform) TransformTokenizedSubject(tokens []string) string {
// Reverse a subjectTransform.
func (tr *subjectTransform) reverse() *subjectTransform {
if len(tr.dtokmftokindexesargs) == 0 {
rtr, _ := newSubjectTransform(tr.dest, tr.src)
rtr, _ := NewSubjectTransform(tr.dest, tr.src)
return rtr
}
// If we are here we need to dynamically get the correct reverse
Expand All @@ -513,6 +513,6 @@ func (tr *subjectTransform) reverse() *subjectTransform {
}
}
ndest := strings.Join(nda, tsep)
rtr, _ := newSubjectTransform(nsrc, ndest)
rtr, _ := NewSubjectTransform(nsrc, ndest)
return rtr
}
4 changes: 2 additions & 2 deletions server/subject_transform_test.go
Expand Up @@ -76,7 +76,7 @@ func TestPlaceHolderIndex(t *testing.T) {
func TestSubjectTransforms(t *testing.T) {
shouldErr := func(src, dest string) {
t.Helper()
if _, err := newSubjectTransform(src, dest); err != ErrBadSubject && !errors.Is(err, ErrInvalidMappingDestination) {
if _, err := NewSubjectTransform(src, dest); err != ErrBadSubject && !errors.Is(err, ErrInvalidMappingDestination) {
t.Fatalf("Did not get an error for src=%q and dest=%q", src, dest)
}
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestSubjectTransforms(t *testing.T) {

shouldBeOK := func(src, dest string) *subjectTransform {
t.Helper()
tr, err := newSubjectTransform(src, dest)
tr, err := NewSubjectTransform(src, dest)
if err != nil {
t.Fatalf("Got an error %v for src=%q and dest=%q", err, src, dest)
}
Expand Down