Skip to content

Commit

Permalink
azurerm_data_factory_pipeline: Support for activities (#6224)
Browse files Browse the repository at this point in the history
  • Loading branch information
lawrencegripper committed Jun 2, 2020
1 parent f7b09eb commit cfae3b2
Show file tree
Hide file tree
Showing 5 changed files with 423 additions and 9 deletions.
37 changes: 37 additions & 0 deletions azurerm/internal/services/datafactory/data_factory.go
@@ -1,6 +1,7 @@
package datafactory

import (
"encoding/json"
"fmt"
"log"
"regexp"
Expand All @@ -9,6 +10,7 @@ import (

"github.com/Azure/azure-sdk-for-go/services/datafactory/mgmt/2018-06-01/datafactory"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/azure"
)

func validateAzureRMDataFactoryLinkedServiceDatasetName(v interface{}, k string) (warnings []string, errors []error) {
Expand Down Expand Up @@ -190,3 +192,38 @@ func flattenDataFactoryStructureColumns(input interface{}) []interface{} {
}
return output
}

func deserializeDataFactoryPipelineActivities(jsonData string) (*[]datafactory.BasicActivity, error) {
jsonData = fmt.Sprintf(`{ "activities": %s }`, jsonData)
pipeline := &datafactory.Pipeline{}
err := pipeline.UnmarshalJSON([]byte(jsonData))
if err != nil {
return nil, err
}
return pipeline.Activities, nil
}

func serializeDataFactoryPipelineActivities(activities *[]datafactory.BasicActivity) (string, error) {
pipeline := &datafactory.Pipeline{Activities: activities}
result, err := pipeline.MarshalJSON()
if err != nil {
return "nil", err
}

var m map[string]*json.RawMessage
err = json.Unmarshal(result, &m)
if err != nil {
return "", err
}

activitiesJson, err := json.Marshal(m["activities"])
if err != nil {
return "", err
}

return string(activitiesJson), nil
}

func suppressJsonOrderingDifference(_, old, new string, _ *schema.ResourceData) bool {
return azure.NormalizeJson(old) == azure.NormalizeJson(new)
}
Expand Up @@ -72,6 +72,13 @@ func resourceArmDataFactoryPipeline() *schema.Resource {
Optional: true,
},

"activities_json": {
Type: schema.TypeString,
Optional: true,
StateFunc: azure.NormalizeJson,
DiffSuppressFunc: suppressJsonOrderingDifference,
},

"annotations": {
Type: schema.TypeList,
Optional: true,
Expand All @@ -98,7 +105,7 @@ func resourceArmDataFactoryPipelineCreateUpdate(d *schema.ResourceData, meta int
existing, err := client.Get(ctx, resourceGroupName, dataFactoryName, name, "")
if err != nil {
if !utils.ResponseWasNotFound(existing.Response) {
return fmt.Errorf("Error checking for presence of existing Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %s", name, resourceGroupName, dataFactoryName, err)
return fmt.Errorf("checking for presence of existing Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %s", name, resourceGroupName, dataFactoryName, err)
}
}

Expand All @@ -114,6 +121,14 @@ func resourceArmDataFactoryPipelineCreateUpdate(d *schema.ResourceData, meta int
Description: &description,
}

if v, ok := d.GetOk("activities_json"); ok {
activities, err := deserializeDataFactoryPipelineActivities(v.(string))
if err != nil {
return fmt.Errorf("parsing 'activities_json' for Data Factory Pipeline %q (Resource Group %q / Data Factory %q) ID: %+v", name, resourceGroupName, dataFactoryName, err)
}
pipeline.Activities = activities
}

if v, ok := d.GetOk("annotations"); ok {
annotations := v.([]interface{})
pipeline.Annotations = &annotations
Expand All @@ -127,16 +142,16 @@ func resourceArmDataFactoryPipelineCreateUpdate(d *schema.ResourceData, meta int
}

if _, err := client.CreateOrUpdate(ctx, resourceGroupName, dataFactoryName, name, config, ""); err != nil {
return fmt.Errorf("Error creating Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err)
return fmt.Errorf("creating Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err)
}

read, err := client.Get(ctx, resourceGroupName, dataFactoryName, name, "")
if err != nil {
return fmt.Errorf("Error retrieving Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err)
return fmt.Errorf("retrieving Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err)
}

if read.ID == nil {
return fmt.Errorf("Cannot read Data Factory Pipeline %q (Resource Group %q / Data Factory %q) ID", name, resourceGroupName, dataFactoryName)
return fmt.Errorf("cannot read Data Factory Pipeline %q (Resource Group %q / Data Factory %q) ID", name, resourceGroupName, dataFactoryName)
}

d.SetId(*read.ID)
Expand All @@ -163,7 +178,7 @@ func resourceArmDataFactoryPipelineRead(d *schema.ResourceData, meta interface{}
log.Printf("[DEBUG] Data Factory Pipeline %q was not found in Resource Group %q - removing from state!", name, id.ResourceGroup)
return nil
}
return fmt.Errorf("Error reading the state of Data Factory Pipeline %q: %+v", name, err)
return fmt.Errorf("reading the state of Data Factory Pipeline %q: %+v", name, err)
}

d.Set("name", resp.Name)
Expand All @@ -175,17 +190,27 @@ func resourceArmDataFactoryPipelineRead(d *schema.ResourceData, meta interface{}

parameters := flattenDataFactoryParameters(props.Parameters)
if err := d.Set("parameters", parameters); err != nil {
return fmt.Errorf("Error setting `parameters`: %+v", err)
return fmt.Errorf("setting `parameters`: %+v", err)
}

annotations := flattenDataFactoryAnnotations(props.Annotations)
if err := d.Set("annotations", annotations); err != nil {
return fmt.Errorf("Error setting `annotations`: %+v", err)
return fmt.Errorf("setting `annotations`: %+v", err)
}

variables := flattenDataFactoryVariables(props.Variables)
if err := d.Set("variables", variables); err != nil {
return fmt.Errorf("Error setting `variables`: %+v", err)
return fmt.Errorf("setting `variables`: %+v", err)
}

if activities := props.Activities; activities != nil {
activitiesJson, err := serializeDataFactoryPipelineActivities(activities)
if err != nil {
return fmt.Errorf("serializing `activities_json`: %+v", err)
}
if err := d.Set("activities_json", activitiesJson); err != nil {
return fmt.Errorf("setting `activities_json`: %+v", err)
}
}
}

Expand All @@ -206,7 +231,7 @@ func resourceArmDataFactoryPipelineDelete(d *schema.ResourceData, meta interface
resourceGroupName := id.ResourceGroup

if _, err = client.Delete(ctx, resourceGroupName, dataFactoryName, name); err != nil {
return fmt.Errorf("Error deleting Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err)
return fmt.Errorf("deleting Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err)
}

return nil
Expand Down
165 changes: 165 additions & 0 deletions azurerm/internal/services/datafactory/data_factory_test.go
Expand Up @@ -38,3 +38,168 @@ func TestAzureRmDataFactoryLinkedServiceConnectionStringDiff(t *testing.T) {
}
}
}

func TestAzureRmDataFactoryDeserializePipelineActivities(t *testing.T) {
cases := []struct {
Json string
ExpectActivityCount int
ExpectErr bool
}{
{
Json: "{}",
ExpectActivityCount: 0,
ExpectErr: true,
},
{
Json: `[
{
"type": "ForEach",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.OutputBlobNameList",
"type": "Expression"
},
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
},
"dataIntegrationUnits": 32
},
"inputs": [
{
"referenceName": "exampleDataset",
"parameters": {
"MyFolderPath": "examplecontainer",
"MyFileName": "examplecontainer.csv"
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "exampleDataset",
"parameters": {
"MyFolderPath": "examplecontainer",
"MyFileName": {
"value": "@item()",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"name": "ExampleCopyActivity"
}
]
},
"name": "ExampleForeachActivity"
}
]`,
ExpectActivityCount: 1,
ExpectErr: false,
},
}

for _, tc := range cases {
items, err := deserializeDataFactoryPipelineActivities(tc.Json)
if err != nil {
if tc.ExpectErr {
t.Log("Expected error and got error")
return
}

t.Fatal(err)
}

if items == nil && !tc.ExpectErr {
t.Fatal("Expected items got nil")
}

if len(*items) != tc.ExpectActivityCount {
t.Fatal("Failed to deserialise pipeline")
}
}
}

func TestNormalizeJSON(t *testing.T) {
cases := []struct {
Old string
New string
Suppress bool
}{
{
Old: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "bob",
"value": "something"
}
}
]`,
New: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"value": "something",
"variableName": "bob"
}
}
]`,
Suppress: true,
},
{
Old: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "bobdifferent",
"value": "something"
}
}
]`,
New: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"value": "something",
"variableName": "bob"
}
}
]`,
Suppress: false,
},
{
Old: `{ "notbob": "notbill" }`,
New: `{ "bob": "bill" }`,
Suppress: false,
},
}

for _, tc := range cases {
suppress := suppressJsonOrderingDifference("test", tc.Old, tc.New, nil)

if suppress != tc.Suppress {
t.Fatalf("Expected JsonOrderingDifference to be '%t' for '%s' '%s' - got '%t'", tc.Suppress, tc.Old, tc.New, suppress)
}
}
}

0 comments on commit cfae3b2

Please sign in to comment.