Skip to content

Commit

Permalink
Add support for reservation affinity, dataproc oss metric collection …
Browse files Browse the repository at this point in the history
…and node-group configs, also support 'SPOT' under preemptibility (#6889) (#13231)

* Add support for reservation affinity under dataproc cluster

* make consume_reservation_type default as false.

* Remove  Default for values since Default is not allowed for TypeSet.

* space

* remove space

* Remove Default for key and values entry
Added test for reservation_affinity
Update dataproc doc

* fix test case

* fix test

* fix test

* Added metrics related tags.

* fix typo

* fix beta version

* fix beta version

* remove AtLeastOneOf if Required is set

* fix atLeastOneOf error

* fix build issue

* add test for metrics.

* disable test on beta

* fix test

* fix test

* add reservation_affinity and test case.

* update related html doc.

* add zone_uri fix test

* fix zone test error

* fix test

* refactory some entry

* fix typo

* fix typo

* fix typo

* update html doc

* fix test

* update based on comments.

* update html

* refactory

* refactory

* fix test

* fix test

* fix test

* fix test

* change test to catch error

* fix test

* change to list

* fix tests

* update text

* add test for SPOT option

* skip SPOT test for v1beta

* guard SPOT with v1 only

* text

* text

* add valid test for reservation affinity

* fix test

* add valid test for node affinity

* fix test

* fix tests

* increase node count

* fix test

* fix test

* adjust space

Signed-off-by: Modular Magician <magic-modules@google.com>

Signed-off-by: Modular Magician <magic-modules@google.com>
  • Loading branch information
modular-magician committed Dec 13, 2022
1 parent 9688bd7 commit dca9c11
Show file tree
Hide file tree
Showing 4 changed files with 454 additions and 1 deletion.
12 changes: 12 additions & 0 deletions .changelog/6889.txt
@@ -0,0 +1,12 @@
```release-note:enhancement
dataproc: added support for `reservation_affinity` in `google_dataproc_cluster`
```
```release-note:enhancement
dataproc: added support for `node_group_affinity` in `google_dataproc_cluster`
```
```release-note:enhancement
dataproc: added support for `dataproc_metric_config` in `google_dataproc_cluster`
```
```release-note:enhancement
dataproc: added support for `SPOT` option for `preemptibility` in `google_dataproc_cluster`
```
193 changes: 192 additions & 1 deletion google/resource_dataproc_cluster.go
Expand Up @@ -56,6 +56,8 @@ var (
"cluster_config.0.gce_cluster_config.0.internal_ip_only",
"cluster_config.0.gce_cluster_config.0.shielded_instance_config",
"cluster_config.0.gce_cluster_config.0.metadata",
"cluster_config.0.gce_cluster_config.0.reservation_affinity",
"cluster_config.0.gce_cluster_config.0.node_group_affinity",
}

schieldedInstanceConfigKeys = []string{
Expand All @@ -64,6 +66,12 @@ var (
"cluster_config.0.gce_cluster_config.0.shielded_instance_config.0.enable_integrity_monitoring",
}

reservationAffinityKeys = []string{
"cluster_config.0.gce_cluster_config.0.reservation_affinity.0.consume_reservation_type",
"cluster_config.0.gce_cluster_config.0.reservation_affinity.0.key",
"cluster_config.0.gce_cluster_config.0.reservation_affinity.0.values",
}

preemptibleWorkerDiskConfigKeys = []string{
"cluster_config.0.preemptible_worker_config.0.disk_config.0.num_local_ssds",
"cluster_config.0.preemptible_worker_config.0.disk_config.0.boot_disk_size_gb",
Expand All @@ -76,6 +84,15 @@ var (
"cluster_config.0.software_config.0.optional_components",
}

dataprocMetricConfigKeys = []string{
"cluster_config.0.dataproc_metric_config.0.metrics",
}

metricKeys = []string{
"cluster_config.0.dataproc_metric_config.0.metrics.0.metric_source",
"cluster_config.0.dataproc_metric_config.0.metrics.0.metric_overrides",
}

clusterConfigKeys = []string{
"cluster_config.0.staging_bucket",
"cluster_config.0.temp_bucket",
Expand All @@ -91,6 +108,7 @@ var (
"cluster_config.0.metastore_config",
"cluster_config.0.lifecycle_config",
"cluster_config.0.endpoint_config",
"cluster_config.0.dataproc_metric_config",
}
)

Expand Down Expand Up @@ -627,6 +645,62 @@ func resourceDataprocCluster() *schema.Resource {
},
},
},

"reservation_affinity": {
Type: schema.TypeList,
Optional: true,
AtLeastOneOf: gceClusterConfigKeys,
Computed: true,
MaxItems: 1,
Description: `Reservation Affinity for consuming Zonal reservation.`,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"consume_reservation_type": {
Type: schema.TypeString,
Optional: true,
AtLeastOneOf: reservationAffinityKeys,
ForceNew: true,
ValidateFunc: validation.StringInSlice([]string{"NO_RESERVATION", "ANY_RESERVATION", "SPECIFIC_RESERVATION"}, false),
Description: `Type of reservation to consume.`,
},
"key": {
Type: schema.TypeString,
Optional: true,
AtLeastOneOf: reservationAffinityKeys,
ForceNew: true,
Description: `Corresponds to the label key of reservation resource.`,
},
"values": {
Type: schema.TypeSet,
Elem: &schema.Schema{Type: schema.TypeString},
Optional: true,
AtLeastOneOf: reservationAffinityKeys,
ForceNew: true,
Description: `Corresponds to the label values of reservation resource.`,
},
},
},
},

"node_group_affinity": {
Type: schema.TypeList,
Optional: true,
AtLeastOneOf: gceClusterConfigKeys,
Computed: true,
MaxItems: 1,
Description: `Node Group Affinity for sole-tenant clusters.`,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"node_group_uri": {
Type: schema.TypeString,
ForceNew: true,
Required: true,
Description: `The URI of a sole-tenant that the cluster will be created on.`,
DiffSuppressFunc: compareSelfLinkOrResourceName,
},
},
},
},
},
},
},
Expand Down Expand Up @@ -669,7 +743,7 @@ func resourceDataprocCluster() *schema.Resource {
"cluster_config.0.preemptible_worker_config.0.disk_config",
},
ForceNew: true,
ValidateFunc: validation.StringInSlice([]string{"PREEMPTIBILITY_UNSPECIFIED", "NON_PREEMPTIBLE", "PREEMPTIBLE"}, false),
ValidateFunc: validation.StringInSlice([]string{"PREEMPTIBILITY_UNSPECIFIED", "NON_PREEMPTIBLE", "PREEMPTIBLE", "SPOT"}, false),
Default: "PREEMPTIBLE",
},

Expand Down Expand Up @@ -1022,6 +1096,23 @@ by Dataproc`,
},
},
},
"dataproc_metric_config": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Description: `The config for Dataproc metrics.`,
AtLeastOneOf: clusterConfigKeys,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"metrics": {
Type: schema.TypeList,
Required: true,
Description: `Metrics sources to enable.`,
Elem: metricsSchema(),
},
},
},
},
},
},
},
Expand All @@ -1030,6 +1121,28 @@ by Dataproc`,
}
}

// We need to pull metrics' schema out so we can use it to make a set hash func
func metricsSchema() *schema.Resource {
return &schema.Resource{
Schema: map[string]*schema.Schema{
"metric_source": {
Type: schema.TypeString,
ForceNew: true,
Required: true,
ValidateFunc: validation.StringInSlice([]string{"MONITORING_AGENT_DEFAULTS", "HDFS", "SPARK", "YARN", "SPARK_HISTORY_SERVER", "HIVESERVER2"}, false),
Description: `A source for the collection of Dataproc OSS metrics (see [available OSS metrics] (https://cloud.google.com//dataproc/docs/guides/monitoring#available_oss_metrics)).`,
},
"metric_overrides": {
Type: schema.TypeSet,
Elem: &schema.Schema{Type: schema.TypeString},
Optional: true,
ForceNew: true,
Description: `Specify one or more [available OSS metrics] (https://cloud.google.com/dataproc/docs/guides/monitoring#available_oss_metrics) to collect.`,
},
},
}
}

func instanceConfigSchema(parent string) *schema.Schema {
var instanceConfigKeys = []string{
"cluster_config.0." + parent + ".0.num_instances",
Expand Down Expand Up @@ -1480,6 +1593,10 @@ func expandClusterConfig(d *schema.ResourceData, config *Config) (*dataproc.Clus
conf.EndpointConfig = expandEndpointConfig(cfg)
}

if cfg, ok := configOptions(d, "cluster_config.0.dataproc_metric_config"); ok {
conf.DataprocMetricConfig = expandDataprocMetricConfig(cfg)
}

if cfg, ok := configOptions(d, "cluster_config.0.master_config"); ok {
log.Println("[INFO] got master_config")
conf.MasterConfig = expandInstanceGroupConfig(cfg)
Expand Down Expand Up @@ -1558,6 +1675,26 @@ func expandGceClusterConfig(d *schema.ResourceData, config *Config) (*dataproc.G
conf.ShieldedInstanceConfig.EnableVtpm = v.(bool)
}
}
if v, ok := d.GetOk("cluster_config.0.gce_cluster_config.0.reservation_affinity"); ok {
cfgRa := v.([]interface{})[0].(map[string]interface{})
conf.ReservationAffinity = &dataproc.ReservationAffinity{}
if v, ok := cfgRa["consume_reservation_type"]; ok {
conf.ReservationAffinity.ConsumeReservationType = v.(string)
}
if v, ok := cfgRa["key"]; ok {
conf.ReservationAffinity.Key = v.(string)
}
if v, ok := cfgRa["values"]; ok {
conf.ReservationAffinity.Values = convertStringSet(v.(*schema.Set))
}
}
if v, ok := d.GetOk("cluster_config.0.gce_cluster_config.0.node_group_affinity"); ok {
cfgNga := v.([]interface{})[0].(map[string]interface{})
conf.NodeGroupAffinity = &dataproc.NodeGroupAffinity{}
if v, ok := cfgNga["node_group_uri"]; ok {
conf.NodeGroupAffinity.NodeGroupUri = v.(string)
}
}
return conf, nil
}

Expand Down Expand Up @@ -1678,6 +1815,23 @@ func expandEndpointConfig(cfg map[string]interface{}) *dataproc.EndpointConfig {
return conf
}

func expandDataprocMetricConfig(cfg map[string]interface{}) *dataproc.DataprocMetricConfig {
conf := &dataproc.DataprocMetricConfig{}
metricsConfigs := cfg["metrics"].([]interface{})
metricsSet := make([]*dataproc.Metric, 0, len(metricsConfigs))

for _, raw := range metricsConfigs {
data := raw.(map[string]interface{})
metric := dataproc.Metric{
MetricSource: data["metric_source"].(string),
MetricOverrides: convertStringSet(data["metric_overrides"].(*schema.Set)),
}
metricsSet = append(metricsSet, &metric)
}
conf.Metrics = metricsSet
return conf
}

func expandMetastoreConfig(cfg map[string]interface{}) *dataproc.MetastoreConfig {
conf := &dataproc.MetastoreConfig{}
if v, ok := cfg["dataproc_metastore_service"]; ok {
Expand Down Expand Up @@ -2087,6 +2241,7 @@ func flattenClusterConfig(d *schema.ResourceData, cfg *dataproc.ClusterConfig) (
"metastore_config": flattenMetastoreConfig(d, cfg.MetastoreConfig),
"lifecycle_config": flattenLifecycleConfig(d, cfg.LifecycleConfig),
"endpoint_config": flattenEndpointConfig(d, cfg.EndpointConfig),
"dataproc_metric_config": flattenDataprocMetricConfig(d, cfg.DataprocMetricConfig),
}

if len(cfg.InitializationActions) > 0 {
Expand Down Expand Up @@ -2193,6 +2348,26 @@ func flattenEndpointConfig(d *schema.ResourceData, ec *dataproc.EndpointConfig)
return []map[string]interface{}{data}
}

func flattenDataprocMetricConfig(d *schema.ResourceData, dmc *dataproc.DataprocMetricConfig) []map[string]interface{} {
if dmc == nil {
return nil
}

data := map[string]interface{}{}
metricsTypeSet := schema.NewSet(schema.HashResource(metricsSchema()), []interface{}{})
for _, metric := range dmc.Metrics {
data := map[string]interface{}{
"metric_source": metric.MetricSource,
"metric_overrides": metric.MetricOverrides,
}

metricsTypeSet.Add(data)
}
data["metrics"] = metricsTypeSet

return []map[string]interface{}{data}
}

func flattenMetastoreConfig(d *schema.ResourceData, ec *dataproc.MetastoreConfig) []map[string]interface{} {
if ec == nil {
return nil
Expand Down Expand Up @@ -2268,6 +2443,22 @@ func flattenGceClusterConfig(d *schema.ResourceData, gcc *dataproc.GceClusterCon
},
}
}
if gcc.ReservationAffinity != nil {
gceConfig["reservation_affinity"] = []map[string]interface{}{
{
"consume_reservation_type": gcc.ReservationAffinity.ConsumeReservationType,
"key": gcc.ReservationAffinity.Key,
"values": gcc.ReservationAffinity.Values,
},
}
}
if gcc.NodeGroupAffinity != nil {
gceConfig["node_group_affinity"] = []map[string]interface{}{
{
"node_group_uri": gcc.NodeGroupAffinity.NodeGroupUri,
},
}
}

return []map[string]interface{}{gceConfig}
}
Expand Down

0 comments on commit dca9c11

Please sign in to comment.