Skip to content

Commit

Permalink
Basic work on pipeline activities
Browse files Browse the repository at this point in the history
  • Loading branch information
lawrencegripper committed Mar 23, 2020
1 parent a7ceca7 commit 4dab518
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 3 deletions.
4 changes: 2 additions & 2 deletions .vscode/launch.json
Expand Up @@ -7,11 +7,11 @@
"request": "launch",
"mode": "test",
// Update this path with the integration test you'd like to debug
"program": "${workspaceRoot}/azurerm/internal/services/storage/tests/data_source_storage_account_test.go",
"program": "${workspaceRoot}/azurerm/internal/services/datafactory/tests/resource_arm_data_factory_pipeline_test.go",
"args": [
"-test.v",
"-test.run",
"TestAccDataSourceAzureRMStorageAccount_basic"
"TestAccAzureRMDataFactoryPipeline_basic"
],
// Copy the `example.private.env` file in `./vscode` to `private.env` and fill in the required values
"envFile": "${workspaceRoot}/.vscode/private.env",
Expand Down
40 changes: 40 additions & 0 deletions azurerm/internal/services/datafactory/data_factory.go
@@ -1,6 +1,7 @@
package datafactory

import (
"encoding/json"
"fmt"
"log"
"regexp"
Expand All @@ -9,6 +10,7 @@ import (

"github.com/Azure/azure-sdk-for-go/services/datafactory/mgmt/2018-06-01/datafactory"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/azure"
)

func validateAzureRMDataFactoryLinkedServiceDatasetName(v interface{}, k string) (warnings []string, errors []error) {
Expand Down Expand Up @@ -190,3 +192,41 @@ func flattenDataFactoryStructureColumns(input interface{}) []interface{} {
}
return output
}

func deserializeDataFactoryPipelineActivities(jsonData string) (*[]datafactory.BasicActivity, error) {
jsonData = fmt.Sprintf(`{ "activities": %s }`, jsonData)
pipeline := &datafactory.Pipeline{}
err := pipeline.UnmarshalJSON([]byte(jsonData))
if err != nil {
return nil, err
}
return pipeline.Activities, nil
}

func serializeDataFactoryPipelineActivities(activities *[]datafactory.BasicActivity) (string, error) {
pipeline := &datafactory.Pipeline{Activities: activities}
result, err := pipeline.MarshalJSON()
if err != nil {
return "nil", err
}

var m map[string]*json.RawMessage
err = json.Unmarshal(result, &m)
if err != nil {
return "", err
}

activitiesJson, err := json.Marshal(m["activities"])
if err != nil {
return "", err
}

return string(activitiesJson), nil
}

func suppressJsonOrderingDifference(_, old, new string, _ *schema.ResourceData) bool {
if azure.NormalizeJson(old) == azure.NormalizeJson(new) {
return true
}
return false
}
165 changes: 165 additions & 0 deletions azurerm/internal/services/datafactory/data_factory_test.go
Expand Up @@ -38,3 +38,168 @@ func TestAzureRmDataFactoryLinkedServiceConnectionStringDiff(t *testing.T) {
}
}
}

func TestAzureRmDataFactoryDeserializePipelineActivities(t *testing.T) {
cases := []struct {
Json string
ExpectActivityCount int
ExpectErr bool
}{
{
Json: "{}",
ExpectActivityCount: 0,
ExpectErr: true,
},
{
Json: `[
{
"type": "ForEach",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.OutputBlobNameList",
"type": "Expression"
},
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
},
"dataIntegrationUnits": 32
},
"inputs": [
{
"referenceName": "exampleDataset",
"parameters": {
"MyFolderPath": "examplecontainer",
"MyFileName": "examplecontainer.csv"
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "exampleDataset",
"parameters": {
"MyFolderPath": "examplecontainer",
"MyFileName": {
"value": "@item()",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"name": "ExampleCopyActivity"
}
]
},
"name": "ExampleForeachActivity"
}
]`,
ExpectActivityCount: 1,
ExpectErr: false,
},
}

for _, tc := range cases {
items, err := deserializeDataFactoryPipelineActivities(tc.Json)
if err != nil {
if tc.ExpectErr {
t.Log("Expected error and got error")
return
}

t.Fatal(err)
}

if items == nil && !tc.ExpectErr {
t.Fatal("Expected items got nil")
}

if len(*items) != tc.ExpectActivityCount {
t.Fatal("Failed to deserialise pipeline")
}
}
}

func TestNormalizeJSON(t *testing.T) {
cases := []struct {
Old string
New string
Suppress bool
}{
{
Old: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "bob",
"value": "something"
}
}
]`,
New: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"value": "something",
"variableName": "bob"
}
}
]`,
Suppress: true,
},
{
Old: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "bobdifferent",
"value": "something"
}
}
]`,
New: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"value": "something",
"variableName": "bob"
}
}
]`,
Suppress: false,
},
{
Old: `{ "notbob": "notbill" }`,
New: `{ "bob": "bill" }`,
Suppress: false,
},
}

for _, tc := range cases {
suppress := suppressJsonOrderingDifference("test", tc.Old, tc.New, nil)

if suppress != tc.Suppress {
t.Fatalf("Expected JsonOrderingDifference to be '%t' for '%s' '%s' - got '%t'", tc.Suppress, tc.Old, tc.New, suppress)
}
}
}
Expand Up @@ -73,6 +73,13 @@ func resourceArmDataFactoryPipeline() *schema.Resource {
Optional: true,
},

"activities_json": {
Type: schema.TypeString,
Optional: true,
StateFunc: azure.NormalizeJson,
DiffSuppressFunc: suppressJsonOrderingDifference,
},

"annotations": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -108,11 +115,18 @@ func resourceArmDataFactoryPipelineCreateUpdate(d *schema.ResourceData, meta int
}
}

activitiesJson := d.Get("activities_json").(string)
activities, err := deserializeDataFactoryPipelineActivities(activitiesJson)
if err != nil {
return fmt.Errorf("Error parsing 'activities_json' for Data Factory Pipeline %q (Resource Group %q / Data Factory %q) ID: %+v", name, resourceGroupName, dataFactoryName, err)
}

description := d.Get("description").(string)
pipeline := &datafactory.Pipeline{
Parameters: expandDataFactoryParameters(d.Get("parameters").(map[string]interface{})),
Variables: expandDataFactoryVariables(d.Get("variables").(map[string]interface{})),
Description: &description,
Activities: activities,
}

if v, ok := d.GetOk("annotations"); ok {
Expand Down Expand Up @@ -188,6 +202,14 @@ func resourceArmDataFactoryPipelineRead(d *schema.ResourceData, meta interface{}
if err := d.Set("variables", variables); err != nil {
return fmt.Errorf("Error setting `variables`: %+v", err)
}

activitiesJson, err := serializeDataFactoryPipelineActivities(props.Activities)
if err != nil {
return fmt.Errorf("Error serializing `activities_json`: %+v", err)
}
if err := d.Set("activities_json", activitiesJson); err != nil {
return fmt.Errorf("Error setting `activities_json`: %+v", err)
}
}

return nil
Expand Down
Expand Up @@ -23,6 +23,8 @@ func TestAccAzureRMDataFactoryPipeline_basic(t *testing.T) {
Config: testAccAzureRMDataFactoryPipeline_basic(data),
Check: resource.ComposeTestCheckFunc(
testCheckAzureRMDataFactoryPipelineExists(data.ResourceName),
resource.TestCheckResourceAttrSet(data.ResourceName, "activities_json"),
testCheckAzureRMDataFactoryPipelineHasAppenVarActivity(data.ResourceName, "Append variable1"),
),
},
data.ImportStep(),
Expand Down Expand Up @@ -115,6 +117,51 @@ func testCheckAzureRMDataFactoryPipelineExists(resourceName string) resource.Tes
}
}

func testCheckAzureRMDataFactoryPipelineHasAppenVarActivity(resourceName string, activityName string) resource.TestCheckFunc {
return func(s *terraform.State) error {
client := acceptance.AzureProvider.Meta().(*clients.Client).DataFactory.PipelinesClient
ctx := acceptance.AzureProvider.Meta().(*clients.Client).StopContext

rs, ok := s.RootModule().Resources[resourceName]
if !ok {
return fmt.Errorf("Not found: %s", resourceName)
}

name := rs.Primary.Attributes["name"]
dataFactoryName := rs.Primary.Attributes["data_factory_name"]
resourceGroup := rs.Primary.Attributes["resource_group_name"]

resp, err := client.Get(ctx, resourceGroup, dataFactoryName, name, "")
if err != nil {
if utils.ResponseWasNotFound(resp.Response) {
return fmt.Errorf("Bad: Data Factory Pipeline %q (Resource Group %q / Data Factory %q) does not exist", name, resourceGroup, dataFactoryName)
}
return fmt.Errorf("Bad: Get on DataFactoryPipelineClient: %+v", err)
}

activities := *resp.Activities
appvarActivity, _ := activities[0].AsAppendVariableActivity()
if *appvarActivity.Name != activityName {
return fmt.Errorf("Bad: Data Factory Pipeline %q (Resource Group %q / Data Factory %q) could not cast as activity", name, resourceGroup, dataFactoryName)
}

return nil
}
}

var activities_json = `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "bob",
"value": "something"
}
}
]`

func testAccAzureRMDataFactoryPipeline_basic(data acceptance.TestData) string {
return fmt.Sprintf(`
provider "azurerm" {
Expand All @@ -136,8 +183,14 @@ resource "azurerm_data_factory_pipeline" "test" {
name = "acctest%d"
resource_group_name = azurerm_resource_group.test.name
data_factory_name = azurerm_data_factory.test.name
variables = {
"bob" = "item1"
}
activities_json = <<JSON
%s
JSON
}
`, data.RandomInteger, data.Locations.Primary, data.RandomInteger, data.RandomInteger)
`, data.RandomInteger, data.Locations.Primary, data.RandomInteger, data.RandomInteger, activities_json)
}

func testAccAzureRMDataFactoryPipeline_update1(data acceptance.TestData) string {
Expand Down

0 comments on commit 4dab518

Please sign in to comment.