Skip to content

Commit

Permalink
Apply stream compression from file store config (#4072)
Browse files Browse the repository at this point in the history
This PR fills the missing link between the stream config and the actual
compression state.

Related: nats-io/jsm.go#445, nats-io/natscli#762

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Apr 19, 2023
2 parents 1a329c7 + 85923c4 commit 4000f8b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 19 deletions.
29 changes: 29 additions & 0 deletions server/filestore.go
Expand Up @@ -107,6 +107,35 @@ func (alg StoreCompression) String() string {
}
}

func (alg StoreCompression) MarshalJSON() ([]byte, error) {
var str string
switch alg {
case S2Compression:
str = "s2"
case NoCompression:
str = "none"
default:
return nil, fmt.Errorf("unknown compression algorithm")
}
return json.Marshal(str)
}

func (alg *StoreCompression) UnmarshalJSON(b []byte) error {
var str string
if err := json.Unmarshal(b, &str); err != nil {
return err
}
switch str {
case "s2":
*alg = S2Compression
case "none":
*alg = NoCompression
default:
return fmt.Errorf("unknown compression algorithm")
}
return nil
}

// File ConsumerInfo is used for creating consumer stores.
type FileConsumerInfo struct {
Created time.Time
Expand Down
40 changes: 21 additions & 19 deletions server/stream.go
Expand Up @@ -38,25 +38,26 @@ import (
// StreamConfig will determine the name, subjects and retention policy
// for a given stream. If subjects is empty the name will be used.
type StreamConfig struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPer int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Discard DiscardPolicy `json:"discard"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPer int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Discard DiscardPolicy `json:"discard"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Compression StoreCompression `json:"compression"`

// Allow applying a subject transform to incoming messages before doing anything else
SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`
Expand Down Expand Up @@ -549,6 +550,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
fsCfg.StoreDir = storeDir
fsCfg.AsyncFlush = false
fsCfg.SyncInterval = 2 * time.Minute
fsCfg.Compression = config.Compression

if err := mset.setupStore(fsCfg); err != nil {
mset.stop(true, false)
Expand Down

0 comments on commit 4000f8b

Please sign in to comment.