Skip to content

Commit

Permalink
Add metadata to StreamConfig and ConsumerConfig
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Jan 26, 2023
1 parent 7c0c85e commit ee0b82e
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 1 deletion.
11 changes: 11 additions & 0 deletions server/consumer.go
Expand Up @@ -94,6 +94,9 @@ type ConsumerConfig struct {

// Don't add to general clients.
Direct bool `json:"direct,omitempty"`

// Metadata is additional metadata for the Consumer.
Metadata map[string]string `json:"metadata,omitempty"`
}

// SequenceInfo has both the consumer and the stream sequence and last activity.
Expand Down Expand Up @@ -554,6 +557,14 @@ func checkConsumerCfg(
}
}

var metadataLen int
for k, v := range config.Metadata {
metadataLen += len(k) + len(v)
}
if metadataLen > JSMaxMetadataLen {
return NewJSConsumerMetadataLengthError(fmt.Sprintf("%dKB", JSMaxMetadataLen/1024))
}

return nil
}

Expand Down
12 changes: 11 additions & 1 deletion server/errors.json
Expand Up @@ -1328,5 +1328,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerMetadataLengthErrF",
"code": 400,
"error_code": 10135,
"description": "consumer metadata exceeds maximum size of {limit}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
]
4 changes: 4 additions & 0 deletions server/jetstream_api.go
Expand Up @@ -330,6 +330,10 @@ func generateJSMappingTable(domain string) map[string]string {
// JSMaxDescription is the maximum description length for streams and consumers.
const JSMaxDescriptionLen = 4 * 1024

// JSMaxMetadataLen is the maximum length for streams an consumers metadata map.
// It's calculated by summing length of all keys an values.
const JSMaxMetadataLen = 128 * 1024

// JSMaxNameLen is the maximum name lengths for streams, consumers and templates.
// Picked 255 as it seems to be a widely used file name limit
const JSMaxNameLen = 255
Expand Down
20 changes: 20 additions & 0 deletions server/jetstream_errors_generated.go
Expand Up @@ -131,6 +131,9 @@ const (
// JSConsumerMaxWaitingNegativeErr consumer max waiting needs to be positive
JSConsumerMaxWaitingNegativeErr ErrorIdentifier = 10087

// JSConsumerMetadataLengthErrF consumer metadata exceeds maximum size of {limit}
JSConsumerMetadataLengthErrF ErrorIdentifier = 10135

// JSConsumerNameContainsPathSeparatorsErr Consumer name can not contain path separators
JSConsumerNameContainsPathSeparatorsErr ErrorIdentifier = 10127

Expand Down Expand Up @@ -449,6 +452,7 @@ var (
JSConsumerMaxRequestBatchNegativeErr: {Code: 400, ErrCode: 10114, Description: "consumer max request batch needs to be > 0"},
JSConsumerMaxRequestExpiresToSmall: {Code: 400, ErrCode: 10115, Description: "consumer max request expires needs to be >= 1ms"},
JSConsumerMaxWaitingNegativeErr: {Code: 400, ErrCode: 10087, Description: "consumer max waiting needs to be positive"},
JSConsumerMetadataLengthErrF: {Code: 400, ErrCode: 10135, Description: "consumer metadata exceeds maximum size of {limit}"},
JSConsumerNameContainsPathSeparatorsErr: {Code: 400, ErrCode: 10127, Description: "Consumer name can not contain path separators"},
JSConsumerNameExistErr: {Code: 400, ErrCode: 10013, Description: "consumer name already in use"},
JSConsumerNameTooLongErrF: {Code: 400, ErrCode: 10102, Description: "consumer name is too long, maximum allowed is {max}"},
Expand Down Expand Up @@ -1027,6 +1031,22 @@ func NewJSConsumerMaxWaitingNegativeError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSConsumerMaxWaitingNegativeErr]
}

// NewJSConsumerMetadataLengthError creates a new JSConsumerMetadataLengthErrF error: "consumer metadata exceeds maximum size of {limit}"
func NewJSConsumerMetadataLengthError(limit interface{}, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

e := ApiErrors[JSConsumerMetadataLengthErrF]
args := e.toReplacerArgs([]interface{}{"{limit}", limit})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSConsumerNameContainsPathSeparatorsError creates a new JSConsumerNameContainsPathSeparatorsErr error: "Consumer name can not contain path separators"
func NewJSConsumerNameContainsPathSeparatorsError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
52 changes: 52 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -19333,3 +19333,55 @@ func TestJetStreamMsgBlkFailOnKernelFault(t *testing.T) {
friendlyBytes(si.Config.MaxBytes), friendlyBytes(int64(si.State.Bytes)))
}
}

func TestJetStreamConsumerAndStreamMetadata(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

metadata := map[string]string{"key": "value", "_nats_created_version": "2.9.11"}
acc := s.GlobalAccount()

// Check stream's first.
mset, err := acc.addStream(&StreamConfig{Name: "foo", Metadata: metadata})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
if cfg := mset.config(); !reflect.DeepEqual(metadata, cfg.Metadata) {
t.Fatalf("Expected a metadata of %q, got %q", metadata, cfg.Metadata)
}

// Now consumer
o, err := mset.addConsumer(&ConsumerConfig{
Metadata: metadata,
DeliverSubject: "to",
AckPolicy: AckNone})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
if cfg := o.config(); !reflect.DeepEqual(metadata, cfg.Metadata) {
t.Fatalf("Expected a metadata of %q, got %q", metadata, cfg.Metadata)
}

// Test max.
data := make([]byte, JSMaxMetadataLen/100)
rand.Read(data)
bigValue := base64.StdEncoding.EncodeToString(data)

bigMetadata := make(map[string]string, 101)
for i := 0; i < 101; i++ {
bigMetadata[fmt.Sprintf("key%d", i)] = bigValue
}

_, err = acc.addStream(&StreamConfig{Name: "bar", Metadata: bigMetadata})
if err == nil || !strings.Contains(err.Error(), "stream metadata exceeds") {
t.Fatalf("Expected an error but got none")
}

_, err = mset.addConsumer(&ConsumerConfig{
Metadata: bigMetadata,
DeliverSubject: "to",
AckPolicy: AckNone})
if err == nil || !strings.Contains(err.Error(), "consumer metadata exceeds") {
t.Fatalf("Expected an error but got none")
}
}
11 changes: 11 additions & 0 deletions server/stream.go
Expand Up @@ -80,6 +80,9 @@ type StreamConfig struct {
// AllowRollup allows messages to be placed into the system and purge
// all older messages using a special msg header.
AllowRollup bool `json:"allow_rollup_hdrs"`

// Metadata is additional metadata for the Stream.
Metadata map[string]string `json:"metadata,omitempty"`
}

// RePublish is for republishing messages once committed to a stream.
Expand Down Expand Up @@ -948,6 +951,14 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream description is too long, maximum allowed is %d", JSMaxDescriptionLen))
}

var metadataLen int
for k, v := range config.Metadata {
metadataLen += len(k) + len(v)
}
if metadataLen > JSMaxMetadataLen {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream metadata exceeds maximum size of %d bytes", JSMaxMetadataLen))
}

cfg := *config

// Make file the default.
Expand Down

0 comments on commit ee0b82e

Please sign in to comment.