Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply stream compression from file store config #4072

Merged
merged 1 commit into from Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 29 additions & 0 deletions server/filestore.go
Expand Up @@ -107,6 +107,35 @@ func (alg StoreCompression) String() string {
}
}

func (alg StoreCompression) MarshalJSON() ([]byte, error) {
var str string
switch alg {
case S2Compression:
str = "s2"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest S2, I think that is how they refer to it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I stuck with the existing pattern from the JS schema here, which is that the values are all lower-case, although there is a separate String() function in the jsm.go PR that is for pretty-printing in the CLI ("S2 compression") so a CLI user never sees it.

case NoCompression:
str = "none"
default:
return nil, fmt.Errorf("unknown compression algorithm")
}
return json.Marshal(str)
}

func (alg *StoreCompression) UnmarshalJSON(b []byte) error {
var str string
if err := json.Unmarshal(b, &str); err != nil {
return err
}
switch str {
case "s2":
*alg = S2Compression
case "none":
*alg = NoCompression
default:
return fmt.Errorf("unknown compression algorithm")
}
return nil
}

// File ConsumerInfo is used for creating consumer stores.
type FileConsumerInfo struct {
Created time.Time
Expand Down
40 changes: 21 additions & 19 deletions server/stream.go
Expand Up @@ -38,25 +38,26 @@ import (
// StreamConfig will determine the name, subjects and retention policy
// for a given stream. If subjects is empty the name will be used.
type StreamConfig struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPer int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Discard DiscardPolicy `json:"discard"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPer int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Discard DiscardPolicy `json:"discard"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Compression StoreCompression `json:"compression"`

// Allow applying a subject transform to incoming messages before doing anything else
SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`
Expand Down Expand Up @@ -549,6 +550,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
fsCfg.StoreDir = storeDir
fsCfg.AsyncFlush = false
fsCfg.SyncInterval = 2 * time.Minute
fsCfg.Compression = config.Compression

if err := mset.setupStore(fsCfg); err != nil {
mset.stop(true, false)
Expand Down