Skip to content

Commit

Permalink
Adds full StreamSource (filters, transforms) functionality to stream …
Browse files Browse the repository at this point in the history
…mirroring

Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com>
  • Loading branch information
jnmoyne committed Aug 1, 2023
1 parent 33d1f85 commit 95aa322
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 15 deletions.
32 changes: 31 additions & 1 deletion server/errors.json
Expand Up @@ -1423,7 +1423,7 @@
"constant": "JSSourceMultipleFiltersNotAllowed",
"code": 400,
"error_code": 10144,
"description": "source with multiple subject filters cannot also have a single subject filter",
"description": "source with multiple subject transforms cannot also have a single subject filter",
"comment": "",
"help": "",
"url": "",
Expand Down Expand Up @@ -1458,5 +1458,35 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSMirrorMultipleFiltersNotAllowed",
"code": 400,
"error_code": 10148,
"description": "mirror with multiple subject transforms cannot also have a single subject filter",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSMirrorInvalidSubjectFilter",
"code": 400,
"error_code": 10149,
"description": "mirror subject filter is invalid",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSMirrorOverlappingSubjectFilters",
"code": 400,
"error_code": 10150,
"description": "mirror subject filters can not overlap",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
48 changes: 45 additions & 3 deletions server/jetstream_errors_generated.go
Expand Up @@ -227,9 +227,18 @@ const (
// JSMirrorInvalidStreamName mirrored stream name is invalid
JSMirrorInvalidStreamName ErrorIdentifier = 10142

// JSMirrorInvalidSubjectFilter mirror subject filter is invalid
JSMirrorInvalidSubjectFilter ErrorIdentifier = 10149

// JSMirrorMaxMessageSizeTooBigErr stream mirror must have max message size >= source
JSMirrorMaxMessageSizeTooBigErr ErrorIdentifier = 10030

// JSMirrorMultipleFiltersNotAllowed mirror with multiple subject transforms cannot also have a single subject filter
JSMirrorMultipleFiltersNotAllowed ErrorIdentifier = 10148

// JSMirrorOverlappingSubjectFilters mirror subject filters can not overlap
JSMirrorOverlappingSubjectFilters ErrorIdentifier = 10150

// JSMirrorWithFirstSeqErr stream mirrors can not have first sequence configured
JSMirrorWithFirstSeqErr ErrorIdentifier = 10143

Expand Down Expand Up @@ -299,7 +308,7 @@ const (
// JSSourceMaxMessageSizeTooBigErr stream source must have max message size >= target
JSSourceMaxMessageSizeTooBigErr ErrorIdentifier = 10046

// JSSourceMultipleFiltersNotAllowed source with multiple subject filters cannot also have a single subject filter
// JSSourceMultipleFiltersNotAllowed source with multiple subject transforms cannot also have a single subject filter
JSSourceMultipleFiltersNotAllowed ErrorIdentifier = 10144

// JSSourceOverlappingSubjectFilters source filters can not overlap
Expand Down Expand Up @@ -520,7 +529,10 @@ var (
JSMemoryResourcesExceededErr: {Code: 500, ErrCode: 10028, Description: "insufficient memory resources available"},
JSMirrorConsumerSetupFailedErrF: {Code: 500, ErrCode: 10029, Description: "{err}"},
JSMirrorInvalidStreamName: {Code: 400, ErrCode: 10142, Description: "mirrored stream name is invalid"},
JSMirrorInvalidSubjectFilter: {Code: 400, ErrCode: 10149, Description: "mirror subject filter is invalid"},
JSMirrorMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10030, Description: "stream mirror must have max message size >= source"},
JSMirrorMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10148, Description: "mirror with multiple subject transforms cannot also have a single subject filter"},
JSMirrorOverlappingSubjectFilters: {Code: 400, ErrCode: 10150, Description: "mirror subject filters can not overlap"},
JSMirrorWithFirstSeqErr: {Code: 400, ErrCode: 10143, Description: "stream mirrors can not have first sequence configured"},
JSMirrorWithSourcesErr: {Code: 400, ErrCode: 10031, Description: "stream mirrors can not also contain other sources"},
JSMirrorWithStartSeqAndTimeErr: {Code: 400, ErrCode: 10032, Description: "stream mirrors can not have both start seq and start time configured"},
Expand All @@ -544,7 +556,7 @@ var (
JSSourceInvalidSubjectFilter: {Code: 400, ErrCode: 10145, Description: "source subject filter is invalid"},
JSSourceInvalidTransformDestination: {Code: 400, ErrCode: 10146, Description: "source transform destination is invalid"},
JSSourceMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10046, Description: "stream source must have max message size >= target"},
JSSourceMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10144, Description: "source with multiple subject filters cannot also have a single subject filter"},
JSSourceMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10144, Description: "source with multiple subject transforms cannot also have a single subject filter"},
JSSourceOverlappingSubjectFilters: {Code: 400, ErrCode: 10147, Description: "source filters can not overlap"},
JSStorageResourcesExceededErr: {Code: 500, ErrCode: 10047, Description: "insufficient storage resources available"},
JSStreamAssignmentErrF: {Code: 500, ErrCode: 10048, Description: "{err}"},
Expand Down Expand Up @@ -1423,6 +1435,16 @@ func NewJSMirrorInvalidStreamNameError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSMirrorInvalidStreamName]
}

// NewJSMirrorInvalidSubjectFilterError creates a new JSMirrorInvalidSubjectFilter error: "mirror subject filter is invalid"
func NewJSMirrorInvalidSubjectFilterError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSMirrorInvalidSubjectFilter]
}

// NewJSMirrorMaxMessageSizeTooBigError creates a new JSMirrorMaxMessageSizeTooBigErr error: "stream mirror must have max message size >= source"
func NewJSMirrorMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand All @@ -1433,6 +1455,26 @@ func NewJSMirrorMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSMirrorMaxMessageSizeTooBigErr]
}

// NewJSMirrorMultipleFiltersNotAllowedError creates a new JSMirrorMultipleFiltersNotAllowed error: "mirror with multiple subject transforms cannot also have a single subject filter"
func NewJSMirrorMultipleFiltersNotAllowedError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSMirrorMultipleFiltersNotAllowed]
}

// NewJSMirrorOverlappingSubjectFiltersError creates a new JSMirrorOverlappingSubjectFilters error: "mirror subject filters can not overlap"
func NewJSMirrorOverlappingSubjectFiltersError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSMirrorOverlappingSubjectFilters]
}

// NewJSMirrorWithFirstSeqError creates a new JSMirrorWithFirstSeqErr error: "stream mirrors can not have first sequence configured"
func NewJSMirrorWithFirstSeqError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down Expand Up @@ -1687,7 +1729,7 @@ func NewJSSourceMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSSourceMaxMessageSizeTooBigErr]
}

// NewJSSourceMultipleFiltersNotAllowedError creates a new JSSourceMultipleFiltersNotAllowed error: "source with multiple subject filters cannot also have a single subject filter"
// NewJSSourceMultipleFiltersNotAllowedError creates a new JSSourceMultipleFiltersNotAllowed error: "source with multiple subject transforms cannot also have a single subject filter"
func NewJSSourceMultipleFiltersNotAllowedError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
Expand Down
101 changes: 98 additions & 3 deletions server/jetstream_test.go
Expand Up @@ -11431,13 +11431,13 @@ func TestJetStreamMirrorBasics(t *testing.T) {
// Clear subjects.
cfg.Subjects = nil

// Source
// Mirrored
scfg := &nats.StreamConfig{
Name: "S1",
Subjects: []string{"foo", "bar", "baz"},
}

// Create source stream
// Create mirrored stream
createStreamOk(scfg)

// Now create our mirror stream.
Expand Down Expand Up @@ -11467,7 +11467,7 @@ func TestJetStreamMirrorBasics(t *testing.T) {
return nil
})

// Purge the source stream.
// Purge the mirrored stream.
if err := js.PurgeStream("S1"); err != nil {
t.Fatalf("Unexpected purge error: %v", err)
}
Expand Down Expand Up @@ -11547,6 +11547,101 @@ func TestJetStreamMirrorBasics(t *testing.T) {
}
return nil
})

// Test subject filtering and transformation
createStreamServerStreamConfig := func(cfg *StreamConfig, errToCheck uint16) {
t.Helper()
req, err := json.Marshal(cfg)
require_NoError(t, err)

rm, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
require_NoError(t, err)

var resp JSApiStreamCreateResponse
if err := json.Unmarshal(rm.Data, &resp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if errToCheck == 0 {
if resp.Error != nil {
t.Fatalf("Unexpected error: %+v", resp.Error)
}
} else {
if resp.Error.ErrCode != errToCheck {
t.Fatalf("Expected error %+v, got: %+v", errToCheck, resp.Error)
}
}
}

// check for errors
createStreamServerStreamConfig(&StreamConfig{
Name: "MBAD",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", FilterSubject: "foo", SubjectTransforms: []SubjectTransformConfig{{Source: "foo", Destination: "foo3"}}},
}, ApiErrors[JSMirrorMultipleFiltersNotAllowed].ErrCode)

createStreamServerStreamConfig(&StreamConfig{
Name: "MBAD",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: ".*.", Destination: "foo3"}}},
}, ApiErrors[JSMirrorInvalidSubjectFilter].ErrCode)

createStreamServerStreamConfig(&StreamConfig{
Name: "MBAD",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: "*", Destination: "{{wildcard(2)}}"}}},
}, ApiErrors[JSStreamCreateErrF].ErrCode)

createStreamServerStreamConfig(&StreamConfig{
Name: "MBAD",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: "foo", Destination: ""}, {Source: "foo", Destination: "bar"}}},
}, ApiErrors[JSMirrorOverlappingSubjectFilters].ErrCode)

createStreamServerStreamConfig(&StreamConfig{
Name: "M5",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", FilterSubject: "foo", SubjectTransformDest: "foo2"},
}, 0)

createStreamServerStreamConfig(&StreamConfig{
Name: "M6",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: "bar", Destination: "bar2"}, {Source: "baz", Destination: "baz2"}}},
}, 0)

// Send 100 messages on foo (there should already be 50 messages on bar and 100 on baz in the stream)
for i := 0; i < 100; i++ {
if _, err := js.Publish("foo", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}

var f = func(streamName string, subject string, subjectNumMsgs uint64, streamNumMsg uint64, firstSeq uint64, lastSeq uint64) func() error {
return func() error {
si, err := js2.StreamInfo(streamName, &nats.StreamInfoRequest{SubjectsFilter: ">"})
require_NoError(t, err)
if ss, ok := si.State.Subjects[subject]; !ok {
t.Log("Expected messages with the transformed subject")
} else {
if ss != subjectNumMsgs {
t.Fatalf("Expected %d messages on the transformed subject but got %d", subjectNumMsgs, ss)
}
}
if si.State.Msgs != streamNumMsg {
return fmt.Errorf("Expected %d stream messages, got state: %+v", streamNumMsg, si.State)
}
if si.State.FirstSeq != firstSeq || si.State.LastSeq != lastSeq {
return fmt.Errorf("Expected first sequence=%d and last sequence=%d, but got state: %+v", firstSeq, lastSeq, si.State)
}
return nil
}
}

checkFor(t, 2*time.Second, 100*time.Millisecond, f("M5", "foo2", 100, 100, 251, 350))
checkFor(t, 2*time.Second, 100*time.Millisecond, f("M6", "bar2", 50, 150, 101, 250))
checkFor(t, 2*time.Second, 100*time.Millisecond, f("M6", "baz2", 100, 150, 101, 250))

}

func TestJetStreamMirrorUpdatePreventsSubjects(t *testing.T) {
Expand Down

0 comments on commit 95aa322

Please sign in to comment.