Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Check for invalid stream name in sources #4222

Merged
merged 3 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions server/errors.json
Expand Up @@ -1388,5 +1388,25 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSSourceInvalidStreamName",
"code": 400,
"error_code": 10141,
"description": "sourced stream name is invalid",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
{
"constant": "JSMirrorInvalidStreamName",
"code": 400,
"error_code": 10142,
"description": "mirrored stream name is invalid",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
30 changes: 29 additions & 1 deletion server/jetstream_errors_generated.go
Expand Up @@ -224,6 +224,9 @@ const (
// JSMirrorConsumerSetupFailedErrF generic mirror consumer setup failure string ({err})
JSMirrorConsumerSetupFailedErrF ErrorIdentifier = 10029

// JSMirrorInvalidStreamName mirrored stream name is invalid
JSMirrorInvalidStreamName ErrorIdentifier = 10142

// JSMirrorMaxMessageSizeTooBigErr stream mirror must have max message size >= source
JSMirrorMaxMessageSizeTooBigErr ErrorIdentifier = 10030

Expand Down Expand Up @@ -278,9 +281,12 @@ const (
// JSSourceConsumerSetupFailedErrF General source consumer setup failure string ({err})
JSSourceConsumerSetupFailedErrF ErrorIdentifier = 10045

// JSSourceDuplicateDetected source stream, filter and transform must form a unique combination (duplicate source configuration detected)
// JSSourceDuplicateDetected source stream, filter and transform (plus external if present) must form a unique combination (duplicate source configuration detected)
JSSourceDuplicateDetected ErrorIdentifier = 10140

// JSSourceInvalidStreamName sourced stream name is invalid
JSSourceInvalidStreamName ErrorIdentifier = 10141

// JSSourceMaxMessageSizeTooBigErr stream source must have max message size >= target
JSSourceMaxMessageSizeTooBigErr ErrorIdentifier = 10046

Expand Down Expand Up @@ -498,6 +504,7 @@ var (
JSMaximumStreamsLimitErr: {Code: 400, ErrCode: 10027, Description: "maximum number of streams reached"},
JSMemoryResourcesExceededErr: {Code: 500, ErrCode: 10028, Description: "insufficient memory resources available"},
JSMirrorConsumerSetupFailedErrF: {Code: 500, ErrCode: 10029, Description: "{err}"},
JSMirrorInvalidStreamName: {Code: 400, ErrCode: 10142, Description: "mirrored stream name is invalid"},
JSMirrorMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10030, Description: "stream mirror must have max message size >= source"},
JSMirrorWithSourcesErr: {Code: 400, ErrCode: 10031, Description: "stream mirrors can not also contain other sources"},
JSMirrorWithStartSeqAndTimeErr: {Code: 400, ErrCode: 10032, Description: "stream mirrors can not have both start seq and start time configured"},
Expand All @@ -517,6 +524,7 @@ var (
JSSnapshotDeliverSubjectInvalidErr: {Code: 400, ErrCode: 10015, Description: "deliver subject not valid"},
JSSourceConsumerSetupFailedErrF: {Code: 500, ErrCode: 10045, Description: "{err}"},
JSSourceDuplicateDetected: {Code: 400, ErrCode: 10140, Description: "duplicate source configuration detected"},
JSSourceInvalidStreamName: {Code: 400, ErrCode: 10141, Description: "sourced stream name is invalid"},
JSSourceMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10046, Description: "stream source must have max message size >= target"},
JSStorageResourcesExceededErr: {Code: 500, ErrCode: 10047, Description: "insufficient storage resources available"},
JSStreamAssignmentErrF: {Code: 500, ErrCode: 10048, Description: "{err}"},
Expand Down Expand Up @@ -1385,6 +1393,16 @@ func NewJSMirrorConsumerSetupFailedError(err error, opts ...ErrorOption) *ApiErr
}
}

// NewJSMirrorInvalidStreamNameError creates a new JSMirrorInvalidStreamName error: "mirrored stream name is invalid"
func NewJSMirrorInvalidStreamNameError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSMirrorInvalidStreamName]
}

// NewJSMirrorMaxMessageSizeTooBigError creates a new JSMirrorMaxMessageSizeTooBigErr error: "stream mirror must have max message size >= source"
func NewJSMirrorMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down Expand Up @@ -1599,6 +1617,16 @@ func NewJSSourceDuplicateDetectedError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSSourceDuplicateDetected]
}

// NewJSSourceInvalidStreamNameError creates a new JSSourceInvalidStreamName error: "sourced stream name is invalid"
func NewJSSourceInvalidStreamNameError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSSourceInvalidStreamName]
}

// NewJSSourceMaxMessageSizeTooBigError creates a new JSSourceMaxMessageSizeTooBigErr error: "stream source must have max message size >= target"
func NewJSSourceMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
4 changes: 2 additions & 2 deletions server/jetstream_super_cluster_test.go
Expand Up @@ -550,7 +550,7 @@ func TestJetStreamSuperClusterConnectionCount(t *testing.T) {
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "src",
Sources: []*nats.StreamSource{{Name: "foo.1"}, {Name: "foo.2"}},
Sources: []*nats.StreamSource{{Name: "foo1"}, {Name: "foo2"}},
Replicas: 3})
require_NoError(t, err)
}()
Expand All @@ -561,7 +561,7 @@ func TestJetStreamSuperClusterConnectionCount(t *testing.T) {
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "mir",
Mirror: &nats.StreamSource{Name: "foo.2"},
Mirror: &nats.StreamSource{Name: "foo2"},
Replicas: 3})
require_NoError(t, err)
}()
Expand Down
4 changes: 4 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -11372,6 +11372,10 @@ func TestJetStreamSourceBasics(t *testing.T) {
}
}

if _, err := js.AddStream(&nats.StreamConfig{Name: "test", Sources: []*nats.StreamSource{{Name: ""}}}); err.Error() == "source stream name is invalid" {
t.Fatal("Expected a source stream name is invalid error")
}

for _, sname := range []string{"foo", "bar", "baz"} {
if _, err := js.AddStream(&nats.StreamConfig{Name: sname}); err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand Down
6 changes: 6 additions & 0 deletions server/stream.go
Expand Up @@ -1164,6 +1164,9 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
// Do not perform checks if External is provided, as it could lead to
// checking against itself (if sourced stream name is the same on different JetStream)
if cfg.Mirror.External == nil {
if !isValidName(cfg.Mirror.Name) {
return StreamConfig{}, NewJSMirrorInvalidStreamNameError()
}
// We do not require other stream to exist anymore, but if we can see it check payloads.
exists, maxMsgSize, subs := hasStream(cfg.Mirror.Name)
if len(subs) > 0 {
Expand Down Expand Up @@ -1203,6 +1206,9 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
// check for duplicates
var iNames = make(map[string]struct{})
for _, src := range cfg.Sources {
if !isValidName(src.Name) {
return StreamConfig{}, NewJSSourceInvalidStreamNameError()
}
if _, ok := iNames[src.composeIName()]; !ok {
iNames[src.composeIName()] = struct{}{}
} else {
Expand Down