Skip to content

Commit

Permalink
[exporter/awscloudwatchlogsexporter] Improve performance of the awscl…
Browse files Browse the repository at this point in the history
…oudwatchlogs exporter (#26692)

Adds support to the to parallelism in the
awscloudwatchlogs exporter by leveraging the [exporter
helper](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md).

In this PR, we are adding support to the `num_consumers` configuration
in the `sending_queue`. This will allow users to specify the number of
consumers that will consume from the sending_queue in parallel.

It is possible and straightforward to use this approach because
CloudWatch logs [no longer requires that you use a token to control
access to the stream that you are writing
to](https://aws.amazon.com/about-aws/whats-new/2023/01/amazon-cloudwatch-logs-log-stream-transaction-quota-sequencetoken-requirement/).
You can write to the same stream in parallel.

To achieve this, this PR does the following:
* Create Pusher that is able to push to multiple streams at the same
time.
* Move lifecycle of the Pusher to the function that is used to consume
from the sending queue. This allows you to safely send to multiple
streams at the same time without any resource contention since each call
to consume logs will not share resources with others that are happening
in parallel (one exception is the creation of log streams).

Besides that I analyzed the code and removed other limitations:
* locks that were not necessary
* Limiter that was used to limit the number of requests per stream to 5
per second. [The TPS is much higher now and is per
account.](https://aws.amazon.com/about-aws/whats-new/2023/01/amazon-cloudwatch-logs-log-stream-transaction-quota-sequencetoken-requirement/)

** How to review this PR: **

The first 3 commits in this PR were used to refactor the code before
making the real changes. Please use the commits to simplify the review
process.

**Link to tracking Issue:** #26360

**Testing:**

- Unit tests were added.
- Tested locally sending logs to cloudwatch logs.

**Documentation:** Documentation was added describing the new
parameters.

---------

Signed-off-by: Raphael Silva <rapphil@gmail.com>
Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
  • Loading branch information
rapphil and Aneurysm9 committed Oct 11, 2023
1 parent 6ce0193 commit 8bb5533
Show file tree
Hide file tree
Showing 16 changed files with 522 additions and 390 deletions.
22 changes: 22 additions & 0 deletions .chloggen/parallel-awscloudwatchlogsexporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awscloudwatchlogsexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Improve the performance of the awscloudwatchlogsexporter"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26692]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Improve the performance by adding support to multiple consumers and removing locks and limiters that are no longer
necessary.
7 changes: 5 additions & 2 deletions exporter/awscloudwatchlogsexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ The following settings can be optionally configured:
- `endpoint`: The CloudWatch Logs service endpoint which the requests are forwarded to. [See the CloudWatch Logs endpoints](https://docs.aws.amazon.com/general/latest/gr/cwl_region.html) for a list.
- `log_retention`: LogRetention is the option to set the log retention policy for only newly created CloudWatch Log Groups. Defaults to Never Expire if not specified or set to 0. Possible values for retention in days are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653.
- `tags`: Tags is the option to set tags for the CloudWatch Log Group. If specified, please add at most 50 tags. Input is a string to string map like so: { 'key': 'value' }. Keys must be between 1-128 characters and follow the regex pattern: `^([\p{L}\p{Z}\p{N}_.:/=+\-@]+)$`(alphanumerics, whitespace, and _.:/=+-!). Values must be between 1-256 characters and follow the regex pattern: `^([\p{L}\p{Z}\p{N}_.:/=+\-@]*)$`(alphanumerics, whitespace, and _.:/=+-!). [Link to tagging restrictions](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html#:~:text=Required%3A%20Yes-,tags,-The%20key%2Dvalue)
- `raw_log`: Boolean default false. If set to true, only the log message will be exported to CloudWatch Logs. This needs to be set to true for [EMF logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html).
- `raw_log`: Boolean default false. If set to true, only the log message will be exported to CloudWatch Logs. This needs to be set to true for [EMF logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html).
- `sending_queue`: [Parameters for the sending queue](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), where you can control parallelism and the size of the sending buffer. Obs.: this component will always have a sending queue enabled.
- `num_consumers`: Number of consumers that will consume from the sending queue. This parameter controls how many consumers will consume from the sending queue in parallel.
- `queue_size`: Maximum number of batches kept in memory before dropping; ignored if enabled is false

### Examples

Expand Down Expand Up @@ -63,7 +66,7 @@ exporters:
- If the log group and/or log stream are specified in an EMF log, that EMF log will be exported to that log group and/or log stream (i.e. ignores the log group and log stream defined in the configuration)
- The log group and log stream will also be created automatically if they do not already exist.
- Example of an EMF log with log group and log stream:
```json
```json
{"_aws":{"Timestamp":1574109732004,"LogGroupName":"Foo", "LogStreamName": "Bar", "CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100}
```
- Resource ARNs (Amazon Resource Name (ARN) of the AWS resource running the collector) are currently not supported with the CloudWatch Logs Exporter.
32 changes: 13 additions & 19 deletions exporter/awscloudwatchlogsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ type Config struct {
// Values must be between 1-256 characters and follow the regex pattern: ^([\p{L}\p{Z}\p{N}_.:/=+\-@]*)$
Tags map[string]*string `mapstructure:"tags"`

// QueueSettings is a subset of exporterhelper.QueueSettings,
// because only QueueSize is user-settable due to how AWS CloudWatch API works
QueueSettings QueueSettings `mapstructure:"sending_queue"`
// Queue settings frm the exporterhelper
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

logger *zap.Logger

Expand All @@ -54,11 +53,6 @@ type Config struct {
RawLog bool `mapstructure:"raw_log,omitempty"`
}

type QueueSettings struct {
// QueueSize set the length of the sending queue
QueueSize int `mapstructure:"queue_size"`
}

var _ component.Config = (*Config)(nil)

// Validate config
Expand All @@ -69,23 +63,23 @@ func (config *Config) Validate() error {
if config.LogStreamName == "" {
return errors.New("'log_stream_name' must be set")
}
if config.QueueSettings.QueueSize < 1 {
return errors.New("'sending_queue.queue_size' must be 1 or greater")

if err := config.QueueSettings.Validate(); err != nil {
return err
}

// TODO: once QueueSettings.Validate validate the number of consumers remove the next
// verification

if config.QueueSettings.NumConsumers < 1 {
return errors.New("'sending_queue.num_consumers' must be 1 or greater")
}

if retErr := cwlogs.ValidateRetentionValue(config.LogRetention); retErr != nil {
return retErr
}
return cwlogs.ValidateTagsInput(config.Tags)

}

func (config *Config) enforcedQueueSettings() exporterhelper.QueueSettings {
return exporterhelper.QueueSettings{
Enabled: true,
// due to the sequence token, there can be only one request in flight
NumConsumers: 1,
QueueSize: config.QueueSettings.QueueSize,
}
}

// TODO(jbd): Add ARN role to config.
37 changes: 23 additions & 14 deletions exporter/awscloudwatchlogsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ func TestLoadConfig(t *testing.T) {
LogStreamName: "testing",
Endpoint: "",
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
},
},
Expand All @@ -61,14 +63,20 @@ func TestLoadConfig(t *testing.T) {
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
LogGroupName: "test-2",
LogStreamName: "testing",
QueueSettings: QueueSettings{
QueueSize: 2,
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: 2,
},
},
},
{
id: component.NewIDWithName(metadata.Type, "invalid_queue_size"),
errorMessage: "'sending_queue.queue_size' must be 1 or greater",
errorMessage: "queue size must be positive",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_num_consumers"),
errorMessage: "'sending_queue.num_consumers' must be 1 or greater",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_required_field_stream"),
Expand All @@ -78,10 +86,6 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "invalid_required_field_group"),
errorMessage: "'log_group_name' must be set",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_queue_setting"),
errorMessage: `'sending_queue' has invalid keys: enabled, num_consumers`,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -113,8 +117,10 @@ func TestRetentionValidateCorrect(t *testing.T) {
Endpoint: "",
LogRetention: 365,
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
}
assert.NoError(t, component.ValidateConfig(cfg))
Expand All @@ -130,7 +136,8 @@ func TestRetentionValidateWrong(t *testing.T) {
Endpoint: "",
LogRetention: 366,
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
}
Expand Down Expand Up @@ -213,8 +220,10 @@ func TestValidateTags(t *testing.T) {
Endpoint: "",
Tags: tt.tags,
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
}
if tt.errorMessage != "" {
Expand Down
106 changes: 32 additions & 74 deletions exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"sync"
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -32,8 +32,7 @@ type cwlExporter struct {
retryCount int
collectorID string
svcStructuredLog *cwlogs.Client
pusherMap map[cwlogs.PusherKey]cwlogs.Pusher
pusherMapLock sync.RWMutex
pusherFactory cwlogs.MultiStreamPusherFactory
}

type awsMetadata struct {
Expand Down Expand Up @@ -68,24 +67,16 @@ func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (*cwlExporter
return nil, err
}

pusherKey := cwlogs.PusherKey{
LogGroupName: expConfig.LogGroupName,
LogStreamName: expConfig.LogStreamName,
}

pusher := cwlogs.NewPusher(pusherKey, *awsConfig.MaxRetries, *svcStructuredLog, params.Logger)

pusherMap := make(map[cwlogs.PusherKey]cwlogs.Pusher)

pusherMap[pusherKey] = pusher
logStreamManager := cwlogs.NewLogStreamManager(*svcStructuredLog)
multiStreamPusherFactory := cwlogs.NewMultiStreamPusherFactory(logStreamManager, *svcStructuredLog, params.Logger)

logsExporter := &cwlExporter{
svcStructuredLog: svcStructuredLog,
Config: expConfig,
logger: params.Logger,
retryCount: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
pusherMap: pusherMap,
pusherFactory: multiStreamPusherFactory,
}
return logsExporter, nil
}
Expand All @@ -101,82 +92,44 @@ func newCwLogsExporter(config component.Config, params exp.CreateSettings) (exp.
params,
config,
logsPusher.consumeLogs,
exporterhelper.WithQueue(expConfig.enforcedQueueSettings()),
exporterhelper.WithQueue(expConfig.QueueSettings),
exporterhelper.WithRetry(expConfig.RetrySettings),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithShutdown(logsPusher.shutdown),
)
}

func (e *cwlExporter) consumeLogs(_ context.Context, ld plog.Logs) error {
logEvents, _ := logsToCWLogs(e.logger, ld, e.Config)
if len(logEvents) == 0 {
return nil
}
pusher := e.pusherFactory.CreateMultiStreamPusher()
var errs error

logPushersUsed := make(map[cwlogs.PusherKey]cwlogs.Pusher)
for _, logEvent := range logEvents {
pusherKey := cwlogs.PusherKey{
LogGroupName: logEvent.LogGroupName,
LogStreamName: logEvent.LogStreamName,
}
cwLogsPusher := e.getLogPusher(logEvent)
e.logger.Debug("Adding log event", zap.Any("event", logEvent))
err := cwLogsPusher.AddLogEntry(logEvent)
if err != nil {
e.logger.Error("Failed ", zap.Int("num_of_events", len(logEvents)))
}
logPushersUsed[pusherKey] = cwLogsPusher
}
var flushErrArray []error
for _, pusher := range logPushersUsed {
flushErr := pusher.ForceFlush()
if flushErr != nil {
e.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(flushErr))
flushErrArray = append(flushErrArray, flushErr)
}
}
if len(flushErrArray) != 0 {
errorString := ""
for _, err := range flushErrArray {
errorString += err.Error()
}
return errors.New(errorString)
}
return nil
}
err := pushLogsToCWLogs(e.logger, ld, e.Config, pusher)

func (e *cwlExporter) getLogPusher(logEvent *cwlogs.Event) cwlogs.Pusher {
e.pusherMapLock.Lock()
defer e.pusherMapLock.Unlock()
pusherKey := cwlogs.PusherKey{
LogGroupName: logEvent.LogGroupName,
LogStreamName: logEvent.LogStreamName,
if err != nil {
errs = errors.Join(errs, fmt.Errorf("Error pushing logs: %w", err))
}
if e.pusherMap[pusherKey] == nil {
pusher := cwlogs.NewPusher(pusherKey, e.retryCount, *e.svcStructuredLog, e.logger)
e.pusherMap[pusherKey] = pusher

err = pusher.ForceFlush()

if err != nil {
errs = errors.Join(errs, fmt.Errorf("Error flushing logs: %w", err))
}
return e.pusherMap[pusherKey]

return errs
}

func (e *cwlExporter) shutdown(_ context.Context) error {
if e.pusherMap != nil {
for _, pusher := range e.pusherMap {
pusher.ForceFlush()
}
}
return nil
}

func logsToCWLogs(logger *zap.Logger, ld plog.Logs, config *Config) ([]*cwlogs.Event, int) {
func pushLogsToCWLogs(logger *zap.Logger, ld plog.Logs, config *Config, pusher cwlogs.Pusher) error {
n := ld.ResourceLogs().Len()

if n == 0 {
return []*cwlogs.Event{}, 0
return nil
}

var dropped int
var out []*cwlogs.Event
var errs error

rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
Expand All @@ -192,14 +145,17 @@ func logsToCWLogs(logger *zap.Logger, ld plog.Logs, config *Config) ([]*cwlogs.E
event, err := logToCWLog(resourceAttrs, log, config)
if err != nil {
logger.Debug("Failed to convert to CloudWatch Log", zap.Error(err))
dropped++
} else {
out = append(out, event)
err := pusher.AddLogEntry(event)
if err != nil {
errs = errors.Join(errs, err)
}
}
}
}
}
return out, dropped

return errs
}

type cwLogBody struct {
Expand Down Expand Up @@ -268,8 +224,10 @@ func logToCWLog(resourceAttrs map[string]interface{}, log plog.LogRecord, config
Timestamp: aws.Int64(int64(log.Timestamp()) / int64(time.Millisecond)), // in milliseconds
Message: aws.String(string(bodyJSON)),
},
LogGroupName: logGroupName,
LogStreamName: logStreamName,
StreamKey: cwlogs.StreamKey{
LogGroupName: logGroupName,
LogStreamName: logStreamName,
},
GeneratedTime: time.Now(),
}, nil
}
Expand Down

0 comments on commit 8bb5533

Please sign in to comment.