Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Full StreamSource (filters, transforms) functionality to stream mirror #4354

Merged
merged 6 commits into from Aug 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 31 additions & 1 deletion server/errors.json
Expand Up @@ -1423,7 +1423,7 @@
"constant": "JSSourceMultipleFiltersNotAllowed",
"code": 400,
"error_code": 10144,
"description": "source with multiple subject filters cannot also have a single subject filter",
"description": "source with multiple subject transforms cannot also have a single subject filter",
"comment": "",
"help": "",
"url": "",
Expand Down Expand Up @@ -1478,5 +1478,35 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSMirrorMultipleFiltersNotAllowed",
"code": 400,
"error_code": 10150,
"description": "mirror with multiple subject transforms cannot also have a single subject filter",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSMirrorInvalidSubjectFilter",
"code": 400,
"error_code": 10151,
"description": "mirror subject filter is invalid",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSMirrorOverlappingSubjectFilters",
"code": 400,
"error_code": 10152,
"description": "mirror subject filters can not overlap",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
48 changes: 45 additions & 3 deletions server/jetstream_errors_generated.go
Expand Up @@ -233,9 +233,18 @@ const (
// JSMirrorInvalidStreamName mirrored stream name is invalid
JSMirrorInvalidStreamName ErrorIdentifier = 10142

// JSMirrorInvalidSubjectFilter mirror subject filter is invalid
JSMirrorInvalidSubjectFilter ErrorIdentifier = 10151

// JSMirrorMaxMessageSizeTooBigErr stream mirror must have max message size >= source
JSMirrorMaxMessageSizeTooBigErr ErrorIdentifier = 10030

// JSMirrorMultipleFiltersNotAllowed mirror with multiple subject transforms cannot also have a single subject filter
JSMirrorMultipleFiltersNotAllowed ErrorIdentifier = 10150

// JSMirrorOverlappingSubjectFilters mirror subject filters can not overlap
JSMirrorOverlappingSubjectFilters ErrorIdentifier = 10152

// JSMirrorWithFirstSeqErr stream mirrors can not have first sequence configured
JSMirrorWithFirstSeqErr ErrorIdentifier = 10143

Expand Down Expand Up @@ -305,7 +314,7 @@ const (
// JSSourceMaxMessageSizeTooBigErr stream source must have max message size >= target
JSSourceMaxMessageSizeTooBigErr ErrorIdentifier = 10046

// JSSourceMultipleFiltersNotAllowed source with multiple subject filters cannot also have a single subject filter
// JSSourceMultipleFiltersNotAllowed source with multiple subject transforms cannot also have a single subject filter
JSSourceMultipleFiltersNotAllowed ErrorIdentifier = 10144

// JSSourceOverlappingSubjectFilters source filters can not overlap
Expand Down Expand Up @@ -528,7 +537,10 @@ var (
JSMemoryResourcesExceededErr: {Code: 500, ErrCode: 10028, Description: "insufficient memory resources available"},
JSMirrorConsumerSetupFailedErrF: {Code: 500, ErrCode: 10029, Description: "{err}"},
JSMirrorInvalidStreamName: {Code: 400, ErrCode: 10142, Description: "mirrored stream name is invalid"},
JSMirrorInvalidSubjectFilter: {Code: 400, ErrCode: 10151, Description: "mirror subject filter is invalid"},
JSMirrorMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10030, Description: "stream mirror must have max message size >= source"},
JSMirrorMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10150, Description: "mirror with multiple subject transforms cannot also have a single subject filter"},
JSMirrorOverlappingSubjectFilters: {Code: 400, ErrCode: 10152, Description: "mirror subject filters can not overlap"},
JSMirrorWithFirstSeqErr: {Code: 400, ErrCode: 10143, Description: "stream mirrors can not have first sequence configured"},
JSMirrorWithSourcesErr: {Code: 400, ErrCode: 10031, Description: "stream mirrors can not also contain other sources"},
JSMirrorWithStartSeqAndTimeErr: {Code: 400, ErrCode: 10032, Description: "stream mirrors can not have both start seq and start time configured"},
Expand All @@ -552,7 +564,7 @@ var (
JSSourceInvalidSubjectFilter: {Code: 400, ErrCode: 10145, Description: "source subject filter is invalid"},
JSSourceInvalidTransformDestination: {Code: 400, ErrCode: 10146, Description: "source transform destination is invalid"},
JSSourceMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10046, Description: "stream source must have max message size >= target"},
JSSourceMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10144, Description: "source with multiple subject filters cannot also have a single subject filter"},
JSSourceMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10144, Description: "source with multiple subject transforms cannot also have a single subject filter"},
JSSourceOverlappingSubjectFilters: {Code: 400, ErrCode: 10147, Description: "source filters can not overlap"},
JSStorageResourcesExceededErr: {Code: 500, ErrCode: 10047, Description: "insufficient storage resources available"},
JSStreamAssignmentErrF: {Code: 500, ErrCode: 10048, Description: "{err}"},
Expand Down Expand Up @@ -1451,6 +1463,16 @@ func NewJSMirrorInvalidStreamNameError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSMirrorInvalidStreamName]
}

// NewJSMirrorInvalidSubjectFilterError creates a new JSMirrorInvalidSubjectFilter error: "mirror subject filter is invalid"
func NewJSMirrorInvalidSubjectFilterError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSMirrorInvalidSubjectFilter]
}

// NewJSMirrorMaxMessageSizeTooBigError creates a new JSMirrorMaxMessageSizeTooBigErr error: "stream mirror must have max message size >= source"
func NewJSMirrorMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand All @@ -1461,6 +1483,26 @@ func NewJSMirrorMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSMirrorMaxMessageSizeTooBigErr]
}

// NewJSMirrorMultipleFiltersNotAllowedError creates a new JSMirrorMultipleFiltersNotAllowed error: "mirror with multiple subject transforms cannot also have a single subject filter"
func NewJSMirrorMultipleFiltersNotAllowedError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSMirrorMultipleFiltersNotAllowed]
}

// NewJSMirrorOverlappingSubjectFiltersError creates a new JSMirrorOverlappingSubjectFilters error: "mirror subject filters can not overlap"
func NewJSMirrorOverlappingSubjectFiltersError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSMirrorOverlappingSubjectFilters]
}

// NewJSMirrorWithFirstSeqError creates a new JSMirrorWithFirstSeqErr error: "stream mirrors can not have first sequence configured"
func NewJSMirrorWithFirstSeqError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down Expand Up @@ -1715,7 +1757,7 @@ func NewJSSourceMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSSourceMaxMessageSizeTooBigErr]
}

// NewJSSourceMultipleFiltersNotAllowedError creates a new JSSourceMultipleFiltersNotAllowed error: "source with multiple subject filters cannot also have a single subject filter"
// NewJSSourceMultipleFiltersNotAllowedError creates a new JSSourceMultipleFiltersNotAllowed error: "source with multiple subject transforms cannot also have a single subject filter"
func NewJSSourceMultipleFiltersNotAllowedError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
Expand Down
101 changes: 98 additions & 3 deletions server/jetstream_test.go
Expand Up @@ -11435,13 +11435,13 @@ func TestJetStreamMirrorBasics(t *testing.T) {
// Clear subjects.
cfg.Subjects = nil

// Source
// Mirrored
scfg := &nats.StreamConfig{
Name: "S1",
Subjects: []string{"foo", "bar", "baz"},
}

// Create source stream
// Create mirrored stream
createStreamOk(scfg)

// Now create our mirror stream.
Expand Down Expand Up @@ -11471,7 +11471,7 @@ func TestJetStreamMirrorBasics(t *testing.T) {
return nil
})

// Purge the source stream.
// Purge the mirrored stream.
if err := js.PurgeStream("S1"); err != nil {
t.Fatalf("Unexpected purge error: %v", err)
}
Expand Down Expand Up @@ -11551,6 +11551,101 @@ func TestJetStreamMirrorBasics(t *testing.T) {
}
return nil
})

// Test subject filtering and transformation
createStreamServerStreamConfig := func(cfg *StreamConfig, errToCheck uint16) {
t.Helper()
req, err := json.Marshal(cfg)
require_NoError(t, err)

rm, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
require_NoError(t, err)

var resp JSApiStreamCreateResponse
if err := json.Unmarshal(rm.Data, &resp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if errToCheck == 0 {
if resp.Error != nil {
t.Fatalf("Unexpected error: %+v", resp.Error)
}
} else {
if resp.Error.ErrCode != errToCheck {
t.Fatalf("Expected error %+v, got: %+v", errToCheck, resp.Error)
}
}
}

// check for errors
createStreamServerStreamConfig(&StreamConfig{
Name: "MBAD",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", FilterSubject: "foo", SubjectTransforms: []SubjectTransformConfig{{Source: "foo", Destination: "foo3"}}},
}, ApiErrors[JSMirrorMultipleFiltersNotAllowed].ErrCode)

createStreamServerStreamConfig(&StreamConfig{
Name: "MBAD",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: ".*.", Destination: "foo3"}}},
}, ApiErrors[JSMirrorInvalidSubjectFilter].ErrCode)

createStreamServerStreamConfig(&StreamConfig{
Name: "MBAD",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: "*", Destination: "{{wildcard(2)}}"}}},
}, ApiErrors[JSStreamCreateErrF].ErrCode)

createStreamServerStreamConfig(&StreamConfig{
Name: "MBAD",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: "foo", Destination: ""}, {Source: "foo", Destination: "bar"}}},
}, ApiErrors[JSMirrorOverlappingSubjectFilters].ErrCode)

createStreamServerStreamConfig(&StreamConfig{
Name: "M5",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", FilterSubject: "foo", SubjectTransformDest: "foo2"},
}, 0)

createStreamServerStreamConfig(&StreamConfig{
Name: "M6",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: "bar", Destination: "bar2"}, {Source: "baz", Destination: "baz2"}}},
}, 0)

// Send 100 messages on foo (there should already be 50 messages on bar and 100 on baz in the stream)
for i := 0; i < 100; i++ {
if _, err := js.Publish("foo", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}

var f = func(streamName string, subject string, subjectNumMsgs uint64, streamNumMsg uint64, firstSeq uint64, lastSeq uint64) func() error {
return func() error {
si, err := js2.StreamInfo(streamName, &nats.StreamInfoRequest{SubjectsFilter: ">"})
require_NoError(t, err)
if ss, ok := si.State.Subjects[subject]; !ok {
t.Log("Expected messages with the transformed subject")
} else {
if ss != subjectNumMsgs {
t.Fatalf("Expected %d messages on the transformed subject but got %d", subjectNumMsgs, ss)
}
}
if si.State.Msgs != streamNumMsg {
return fmt.Errorf("Expected %d stream messages, got state: %+v", streamNumMsg, si.State)
}
if si.State.FirstSeq != firstSeq || si.State.LastSeq != lastSeq {
return fmt.Errorf("Expected first sequence=%d and last sequence=%d, but got state: %+v", firstSeq, lastSeq, si.State)
}
return nil
}
}

checkFor(t, 2*time.Second, 100*time.Millisecond, f("M5", "foo2", 100, 100, 251, 350))
checkFor(t, 2*time.Second, 100*time.Millisecond, f("M6", "bar2", 50, 150, 101, 250))
checkFor(t, 2*time.Second, 100*time.Millisecond, f("M6", "baz2", 100, 150, 101, 250))

}

func TestJetStreamMirrorUpdatePreventsSubjects(t *testing.T) {
Expand Down