Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/awscloudwatchlogsexporter] Improve performance of the awscloudwatchlogs exporter #26692

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
406ce61
Chore: Rename PusherKey to StreamKey
rapphil Sep 13, 2023
7ee35f6
Chore: Move StreamKey inside event
rapphil Sep 13, 2023
1e55b06
Chore: Refactor to properly initialize the event
rapphil Sep 13, 2023
7d4c305
Feat: Add cwlogs Pusher able to send events to multiple streams
rapphil Sep 13, 2023
10b4638
Feat: use MultiStream pusher in awscloudwatchlogsexporter
rapphil Sep 14, 2023
8b3e0f4
Feat: Add support to consumers in cloudwatchlogs
rapphil Sep 13, 2023
1d9e0e2
Feat: Remove locks from cloudwatch logs pusher
rapphil Sep 14, 2023
eb38388
Feat: Remove throttling limiter
rapphil Sep 13, 2023
069f471
Feat: Stop using sequence token
rapphil Sep 14, 2023
51ddd00
Feat: Send log events directly to the Pusher
rapphil Sep 14, 2023
034b8ca
chore: Update readme
rapphil Sep 14, 2023
fb01288
Chore: update changelog
rapphil Sep 14, 2023
d189d48
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Sep 14, 2023
caf1be6
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Sep 14, 2023
0b4dbea
Chore: fix wrong dependency
rapphil Sep 14, 2023
86e2c38
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Sep 18, 2023
bd7d4a1
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Sep 21, 2023
c9f98b1
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Sep 21, 2023
e6383ab
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Oct 3, 2023
ccd669c
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Oct 6, 2023
8f183e2
Update internal/aws/cwlogs/pusher.go
rapphil Oct 6, 2023
fb93d1b
Use queue validation from exporterhelper
rapphil Oct 6, 2023
698e581
Simplify error handling and avoid uncessary code
rapphil Oct 9, 2023
0cc848c
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Oct 9, 2023
eb608f6
Chore: Refactor pusher and cwlog_client to not use stream token
rapphil Oct 9, 2023
0489473
Chore: minor refactoring
rapphil Oct 9, 2023
f20ffae
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Oct 9, 2023
dec0f2d
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Oct 10, 2023
c799e53
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 22 additions & 0 deletions .chloggen/parallel-awscloudwatchlogsexporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe breaking is ok? since the config is changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it is considered breaking in this case. The config is extended but the default behavior stays the same. @rapphil can you confirm this statement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct, we are extending the configuration and keeping the previous behaviour unchanged. That is why I don't consider this a breaking change.


# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awscloudwatchlogsexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Improve the performance of the awscloudwatchlogsexporter"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26692]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Improve the performance by adding support to multiple consumers and removing locks and limiters that are no longer
necessary.
7 changes: 5 additions & 2 deletions exporter/awscloudwatchlogsexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ The following settings can be optionally configured:
- `endpoint`: The CloudWatch Logs service endpoint which the requests are forwarded to. [See the CloudWatch Logs endpoints](https://docs.aws.amazon.com/general/latest/gr/cwl_region.html) for a list.
- `log_retention`: LogRetention is the option to set the log retention policy for only newly created CloudWatch Log Groups. Defaults to Never Expire if not specified or set to 0. Possible values for retention in days are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653.
- `tags`: Tags is the option to set tags for the CloudWatch Log Group. If specified, please add at most 50 tags. Input is a string to string map like so: { 'key': 'value' }. Keys must be between 1-128 characters and follow the regex pattern: `^([\p{L}\p{Z}\p{N}_.:/=+\-@]+)$`(alphanumerics, whitespace, and _.:/=+-!). Values must be between 1-256 characters and follow the regex pattern: `^([\p{L}\p{Z}\p{N}_.:/=+\-@]*)$`(alphanumerics, whitespace, and _.:/=+-!). [Link to tagging restrictions](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html#:~:text=Required%3A%20Yes-,tags,-The%20key%2Dvalue)
- `raw_log`: Boolean default false. If set to true, only the log message will be exported to CloudWatch Logs. This needs to be set to true for [EMF logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html).
- `raw_log`: Boolean default false. If set to true, only the log message will be exported to CloudWatch Logs. This needs to be set to true for [EMF logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html).
- `sending_queue`: [Parameters for the sending queue](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), where you can control parallelism and the size of the sending buffer. Obs.: this component will always have a sending queue enabled.
- `num_consumers`: Number of consumers that will consume from the sending queue. This parameter controls how many consumers will consume from the sending queue in parallel.
- `queue_size`: Maximum number of batches kept in memory before dropping; ignored if enabled is false

### Examples

Expand Down Expand Up @@ -63,7 +66,7 @@ exporters:
- If the log group and/or log stream are specified in an EMF log, that EMF log will be exported to that log group and/or log stream (i.e. ignores the log group and log stream defined in the configuration)
- The log group and log stream will also be created automatically if they do not already exist.
- Example of an EMF log with log group and log stream:
```json
```json
{"_aws":{"Timestamp":1574109732004,"LogGroupName":"Foo", "LogStreamName": "Bar", "CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100}
```
- Resource ARNs (Amazon Resource Name (ARN) of the AWS resource running the collector) are currently not supported with the CloudWatch Logs Exporter.
32 changes: 13 additions & 19 deletions exporter/awscloudwatchlogsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ type Config struct {
// Values must be between 1-256 characters and follow the regex pattern: ^([\p{L}\p{Z}\p{N}_.:/=+\-@]*)$
Tags map[string]*string `mapstructure:"tags"`

// QueueSettings is a subset of exporterhelper.QueueSettings,
// because only QueueSize is user-settable due to how AWS CloudWatch API works
QueueSettings QueueSettings `mapstructure:"sending_queue"`
// Queue settings frm the exporterhelper
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

logger *zap.Logger

Expand All @@ -54,11 +53,6 @@ type Config struct {
RawLog bool `mapstructure:"raw_log,omitempty"`
}

type QueueSettings struct {
// QueueSize set the length of the sending queue
QueueSize int `mapstructure:"queue_size"`
}

var _ component.Config = (*Config)(nil)

// Validate config
Expand All @@ -69,23 +63,23 @@ func (config *Config) Validate() error {
if config.LogStreamName == "" {
return errors.New("'log_stream_name' must be set")
}
if config.QueueSettings.QueueSize < 1 {
return errors.New("'sending_queue.queue_size' must be 1 or greater")

if err := config.QueueSettings.Validate(); err != nil {
return err
}

// TODO: once QueueSettings.Validate validate the number of consumers remove the next
// verification

if config.QueueSettings.NumConsumers < 1 {
return errors.New("'sending_queue.num_consumers' must be 1 or greater")
}

if retErr := cwlogs.ValidateRetentionValue(config.LogRetention); retErr != nil {
return retErr
}
return cwlogs.ValidateTagsInput(config.Tags)

}

func (config *Config) enforcedQueueSettings() exporterhelper.QueueSettings {
return exporterhelper.QueueSettings{
Enabled: true,
// due to the sequence token, there can be only one request in flight
NumConsumers: 1,
QueueSize: config.QueueSettings.QueueSize,
}
}

// TODO(jbd): Add ARN role to config.
37 changes: 23 additions & 14 deletions exporter/awscloudwatchlogsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ func TestLoadConfig(t *testing.T) {
LogStreamName: "testing",
Endpoint: "",
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
},
},
Expand All @@ -61,14 +63,20 @@ func TestLoadConfig(t *testing.T) {
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
LogGroupName: "test-2",
LogStreamName: "testing",
QueueSettings: QueueSettings{
QueueSize: 2,
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: 2,
},
},
},
{
id: component.NewIDWithName(metadata.Type, "invalid_queue_size"),
errorMessage: "'sending_queue.queue_size' must be 1 or greater",
errorMessage: "queue size must be positive",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_num_consumers"),
errorMessage: "'sending_queue.num_consumers' must be 1 or greater",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_required_field_stream"),
Expand All @@ -78,10 +86,6 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "invalid_required_field_group"),
errorMessage: "'log_group_name' must be set",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_queue_setting"),
errorMessage: `'sending_queue' has invalid keys: enabled, num_consumers`,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -113,8 +117,10 @@ func TestRetentionValidateCorrect(t *testing.T) {
Endpoint: "",
LogRetention: 365,
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
}
assert.NoError(t, component.ValidateConfig(cfg))
Expand All @@ -130,7 +136,8 @@ func TestRetentionValidateWrong(t *testing.T) {
Endpoint: "",
LogRetention: 366,
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
}
Expand Down Expand Up @@ -213,8 +220,10 @@ func TestValidateTags(t *testing.T) {
Endpoint: "",
Tags: tt.tags,
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
}
if tt.errorMessage != "" {
Expand Down
106 changes: 32 additions & 74 deletions exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"sync"
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -32,8 +32,7 @@ type cwlExporter struct {
retryCount int
collectorID string
svcStructuredLog *cwlogs.Client
pusherMap map[cwlogs.PusherKey]cwlogs.Pusher
pusherMapLock sync.RWMutex
pusherFactory cwlogs.MultiStreamPusherFactory
}

type awsMetadata struct {
Expand Down Expand Up @@ -68,24 +67,16 @@ func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (*cwlExporter
return nil, err
}

pusherKey := cwlogs.PusherKey{
LogGroupName: expConfig.LogGroupName,
LogStreamName: expConfig.LogStreamName,
}

pusher := cwlogs.NewPusher(pusherKey, *awsConfig.MaxRetries, *svcStructuredLog, params.Logger)

pusherMap := make(map[cwlogs.PusherKey]cwlogs.Pusher)

pusherMap[pusherKey] = pusher
logStreamManager := cwlogs.NewLogStreamManager(*svcStructuredLog)
multiStreamPusherFactory := cwlogs.NewMultiStreamPusherFactory(logStreamManager, *svcStructuredLog, params.Logger)

logsExporter := &cwlExporter{
svcStructuredLog: svcStructuredLog,
Config: expConfig,
logger: params.Logger,
retryCount: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
pusherMap: pusherMap,
pusherFactory: multiStreamPusherFactory,
}
return logsExporter, nil
}
Expand All @@ -101,82 +92,44 @@ func newCwLogsExporter(config component.Config, params exp.CreateSettings) (exp.
params,
config,
logsPusher.consumeLogs,
exporterhelper.WithQueue(expConfig.enforcedQueueSettings()),
exporterhelper.WithQueue(expConfig.QueueSettings),
exporterhelper.WithRetry(expConfig.RetrySettings),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithShutdown(logsPusher.shutdown),
)
}

func (e *cwlExporter) consumeLogs(_ context.Context, ld plog.Logs) error {
logEvents, _ := logsToCWLogs(e.logger, ld, e.Config)
if len(logEvents) == 0 {
return nil
}
pusher := e.pusherFactory.CreateMultiStreamPusher()
var errs error

logPushersUsed := make(map[cwlogs.PusherKey]cwlogs.Pusher)
for _, logEvent := range logEvents {
pusherKey := cwlogs.PusherKey{
LogGroupName: logEvent.LogGroupName,
LogStreamName: logEvent.LogStreamName,
}
cwLogsPusher := e.getLogPusher(logEvent)
e.logger.Debug("Adding log event", zap.Any("event", logEvent))
err := cwLogsPusher.AddLogEntry(logEvent)
if err != nil {
e.logger.Error("Failed ", zap.Int("num_of_events", len(logEvents)))
}
logPushersUsed[pusherKey] = cwLogsPusher
}
var flushErrArray []error
for _, pusher := range logPushersUsed {
flushErr := pusher.ForceFlush()
if flushErr != nil {
e.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(flushErr))
flushErrArray = append(flushErrArray, flushErr)
}
}
if len(flushErrArray) != 0 {
errorString := ""
for _, err := range flushErrArray {
errorString += err.Error()
}
return errors.New(errorString)
}
return nil
}
err := pushLogsToCWLogs(e.logger, ld, e.Config, pusher)

func (e *cwlExporter) getLogPusher(logEvent *cwlogs.Event) cwlogs.Pusher {
e.pusherMapLock.Lock()
defer e.pusherMapLock.Unlock()
pusherKey := cwlogs.PusherKey{
LogGroupName: logEvent.LogGroupName,
LogStreamName: logEvent.LogStreamName,
if err != nil {
errs = errors.Join(errs, fmt.Errorf("Error pushing logs: %w", err))
}
if e.pusherMap[pusherKey] == nil {
pusher := cwlogs.NewPusher(pusherKey, e.retryCount, *e.svcStructuredLog, e.logger)
e.pusherMap[pusherKey] = pusher

err = pusher.ForceFlush()

if err != nil {
errs = errors.Join(errs, fmt.Errorf("Error flushing logs: %w", err))
}
return e.pusherMap[pusherKey]

return errs
bryan-aguilar marked this conversation as resolved.
Show resolved Hide resolved
}

func (e *cwlExporter) shutdown(_ context.Context) error {
if e.pusherMap != nil {
for _, pusher := range e.pusherMap {
pusher.ForceFlush()
}
}
return nil
}

func logsToCWLogs(logger *zap.Logger, ld plog.Logs, config *Config) ([]*cwlogs.Event, int) {
func pushLogsToCWLogs(logger *zap.Logger, ld plog.Logs, config *Config, pusher cwlogs.Pusher) error {
n := ld.ResourceLogs().Len()

if n == 0 {
return []*cwlogs.Event{}, 0
return nil
}

var dropped int
var out []*cwlogs.Event
var errs error

rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
Expand All @@ -192,14 +145,17 @@ func logsToCWLogs(logger *zap.Logger, ld plog.Logs, config *Config) ([]*cwlogs.E
event, err := logToCWLog(resourceAttrs, log, config)
if err != nil {
logger.Debug("Failed to convert to CloudWatch Log", zap.Error(err))
dropped++
} else {
out = append(out, event)
err := pusher.AddLogEntry(event)
if err != nil {
errs = errors.Join(errs, err)
}
}
}
}
}
return out, dropped

return errs
}

type cwLogBody struct {
Expand Down Expand Up @@ -268,8 +224,10 @@ func logToCWLog(resourceAttrs map[string]interface{}, log plog.LogRecord, config
Timestamp: aws.Int64(int64(log.Timestamp()) / int64(time.Millisecond)), // in milliseconds
Message: aws.String(string(bodyJSON)),
},
LogGroupName: logGroupName,
LogStreamName: logStreamName,
StreamKey: cwlogs.StreamKey{
LogGroupName: logGroupName,
LogStreamName: logStreamName,
},
GeneratedTime: time.Now(),
}, nil
}
Expand Down