Skip to content

Commit

Permalink
feat: Add scaler for Google Cloud Tasks (kedacore#4834)
Browse files Browse the repository at this point in the history
* feature: add cloud tasks scaler

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* Add cloud tasks e2e test

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: Return files to the original state

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: static checks

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: more static checks fixed

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: add location for queue

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: add correct command to create test tasks in queue

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: add fixes to test and add pod identity test

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: specify location also in tasks

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: correct indentation and location for purge

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: correct naming, add package correctly to identity tests

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: change test name

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: add gcp as prefix for naming for clarity

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: correct problem in test when changing name in struct

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: order in changelog

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: another order change in changelog

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: more renaming

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: put it in a new section

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: try a new order for the changelog

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: delete unneeded line

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: into the new Changelog section

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

* fix: another order fix

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

---------

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>
Signed-off-by: Jose Maria Alvarez Fernandez <52029309+jmalvarezf-lmes@users.noreply.github.com>
  • Loading branch information
jmalvarezf-lmes committed Sep 11, 2023
1 parent c94c20f commit 7daa09a
Show file tree
Hide file tree
Showing 6 changed files with 854 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- **General**: Introduce new Google Cloud Tasks scaler functionality to scale based on the queue length ([#3613](https://github.com/kedacore/keda/issues/3613))
- **AWS SQS Scaler**: Support for scaling to include delayed messages. ([#4377](https://github.com/kedacore/keda/issues/4377))
- **Governance**: KEDA transitioned to CNCF Graduated project ([#63](https://github.com/kedacore/governance/issues/63))

### Improvements

- **General**: Add more events for user checking ([#796](https://github.com/kedacore/keda/issues/3764))
- **General**: Add ScaledObject/ScaledJob names to output of `kubectl get triggerauthentication/clustertriggerauthentication` ([#796](https://github.com/kedacore/keda/issues/796))
- **General**: Add standalone CRD generation to release workflow ([#2726](https://github.com/kedacore/keda/issues/2726))
Expand Down
184 changes: 184 additions & 0 deletions pkg/scalers/gcp_cloud_tasks_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package scalers

import (
"context"
"fmt"
"strconv"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
cloudTasksStackDriverQueueSize = "cloudtasks.googleapis.com/queue/depth"

cloudTaskDefaultValue = 100
)

type gcpCloudTasksScaler struct {
client *StackDriverClient
metricType v2.MetricTargetType
metadata *gcpCloudTaskMetadata
logger logr.Logger
}

type gcpCloudTaskMetadata struct {
value float64
activationValue float64

queueName string
projectID string
gcpAuthorization *gcpAuthorizationMetadata
scalerIndex int
}

// NewCloudTaskScaler creates a new cloudTaskScaler
func NewGcpCloudTasksScaler(config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

logger := InitializeLogger(config, "gcp_cloud_tasks_scaler")

meta, err := parseGcpCloudTasksMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing Cloud Tasks metadata: %w", err)
}

return &gcpCloudTasksScaler{
metricType: metricType,
metadata: meta,
logger: logger,
}, nil
}

func parseGcpCloudTasksMetadata(config *ScalerConfig) (*gcpCloudTaskMetadata, error) {
meta := gcpCloudTaskMetadata{value: cloudTaskDefaultValue}

value, valuePresent := config.TriggerMetadata["value"]

if valuePresent {
triggerValue, err := strconv.ParseFloat(value, 64)
if err != nil {
return nil, fmt.Errorf("value parsing error %w", err)
}
meta.value = triggerValue
}

if val, ok := config.TriggerMetadata["queueName"]; ok {
if val == "" {
return nil, fmt.Errorf("no queue name given")
}

meta.queueName = val
} else {
return nil, fmt.Errorf("no queue name given")
}

meta.activationValue = 0
if val, ok := config.TriggerMetadata["activationValue"]; ok {
activationValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationValue parsing error %w", err)
}
meta.activationValue = activationValue
}

if val, ok := config.TriggerMetadata["projectID"]; ok {
if val == "" {
return nil, fmt.Errorf("no project id given")
}

meta.projectID = val
} else {
return nil, fmt.Errorf("no project id given")
}

auth, err := getGCPAuthorization(config)
if err != nil {
return nil, err
}
meta.gcpAuthorization = auth
meta.scalerIndex = config.ScalerIndex
return &meta, nil
}

func (s *gcpCloudTasksScaler) Close(context.Context) error {
if s.client != nil {
err := s.client.metricsClient.Close()
s.client = nil
if err != nil {
s.logger.Error(err, "error closing StackDriver client")
}
}

return nil
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *gcpCloudTasksScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ct-%s", s.metadata.queueName))),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.value),
}

// Create the metric spec for the HPA
metricSpec := v2.MetricSpec{
External: externalMetric,
Type: externalMetricType,
}

return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity connects to Stack Driver and finds the size of the cloud task
func (s *gcpCloudTasksScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
metricType := cloudTasksStackDriverQueueSize

value, err := s.getMetrics(ctx, metricType)
if err != nil {
s.logger.Error(err, "error getting metric", "metricType", metricType)
return []external_metrics.ExternalMetricValue{}, false, err
}

metric := GenerateMetricInMili(metricName, value)

return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationValue, nil
}

func (s *gcpCloudTasksScaler) setStackdriverClient(ctx context.Context) error {
var client *StackDriverClient
var err error
if s.metadata.gcpAuthorization.podIdentityProviderEnabled {
client, err = NewStackDriverClientPodIdentity(ctx)
} else {
client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials)
}

if err != nil {
return err
}
s.client = client
return nil
}

// getMetrics gets metric type value from stackdriver api
func (s *gcpCloudTasksScaler) getMetrics(ctx context.Context, metricType string) (float64, error) {
if s.client == nil {
err := s.setStackdriverClient(ctx)
if err != nil {
return -1, err
}
}
filter := `metric.type="` + metricType + `" AND resource.labels.queue_id="` + s.metadata.queueName + `"`

// Cloud Tasks metrics are collected every 60 seconds so no need to aggregate them.
// See: https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudtasks
return s.client.GetMetrics(ctx, filter, s.metadata.projectID, nil)
}
79 changes: 79 additions & 0 deletions pkg/scalers/gcp_cloud_tasks_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package scalers

import (
"context"
"testing"

"github.com/go-logr/logr"
)

var testGcpCloudTasksResolvedEnv = map[string]string{
"SAMPLE_CREDS": "{}",
}

type parseGcpCloudTasksMetadataTestData struct {
authParams map[string]string
metadata map[string]string
isError bool
}

type gcpCloudTasksMetricIdentifier struct {
metadataTestData *parseGcpCloudTasksMetadataTestData
scalerIndex int
name string
}

var testGcpCloudTasksMetadata = []parseGcpCloudTasksMetadataTestData{
{map[string]string{}, map[string]string{}, true},
// all properly formed
{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "5"}, false},
// missing subscriptionName
{nil, map[string]string{"queueName": "", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// missing credentials
{nil, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject", "credentialsFromEnv": ""}, true},
// malformed subscriptionSize
{nil, map[string]string{"queueName": "myQueue", "value": "AA", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed mode
{nil, map[string]string{"queueName": "", "mode": "AA", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed activationTargetValue
{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "AA"}, true},
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds"}, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject"}, false},
// Credentials from AuthParams with empty creds
{map[string]string{"GoogleApplicationCredentials": ""}, map[string]string{"queueName": "myQueue", "subscriptionSize": "7", "projectID": "myproject"}, true},
// properly formed float value and activationTargetValue
{nil, map[string]string{"queueName": "mysubscription", "value": "7.1", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "2.1", "projectID": "myproject"}, false},
}

var gcpCloudTasksMetricIdentifiers = []gcpCloudTasksMetricIdentifier{
{&testGcpCloudTasksMetadata[1], 0, "s0-gcp-ct-myQueue"},
{&testGcpCloudTasksMetadata[1], 1, "s1-gcp-ct-myQueue"},
}

func TestGcpCloudTasksParseMetadata(t *testing.T) {
for _, testData := range testGcpCloudTasksMetadata {
_, err := parseGcpCloudTasksMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testGcpCloudTasksResolvedEnv})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

func TestGcpCloudTasksGetMetricSpecForScaling(t *testing.T) {
for _, testData := range gcpCloudTasksMetricIdentifiers {
meta, err := parseGcpCloudTasksMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testGcpCloudTasksResolvedEnv, ScalerIndex: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockGcpCloudTasksScaler := gcpCloudTasksScaler{nil, "", meta, logr.Discard()}

metricSpec := mockGcpCloudTasksScaler.GetMetricSpecForScaling(context.Background())
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scalers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
return scalers.NewExternalMockScaler(config)
case "external-push":
return scalers.NewExternalPushScaler(config)
case "gcp-cloudtasks":
return scalers.NewGcpCloudTasksScaler(config)
case "gcp-pubsub":
return scalers.NewPubSubScaler(config)
case "gcp-stackdriver":
Expand Down

0 comments on commit 7daa09a

Please sign in to comment.