Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed May 16, 2024
1 parent 08d7b91 commit 7e74f0a
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright 2023-2024 Broadcom
# SPDX-License-Identifier: Apache-2.0
import os
import logging
import os

from vdk.api.job_input import IJobInput
from vdk.plugin.trino.templates.data_quality_exception import DataQualityException
Expand Down Expand Up @@ -30,71 +30,80 @@ def run(job_input: IJobInput):
source_view = job_arguments.get("source_view")
target_schema = job_arguments.get("target_schema")
target_table = job_arguments.get("target_table")
insert_query = CommonUtilities.get_file_content(SQL_FILES_FOLDER, "02-insert-into-target.sql")
create_table_query = CommonUtilities.get_file_content(SQL_FILES_FOLDER, "02-create-clone-table.sql")
create_view_query = CommonUtilities.get_file_content(SQL_FILES_FOLDER, "02-create-consolidated-view.sql")
drop_table_query = CommonUtilities.get_file_content(SQL_FILES_FOLDER, "02-drop-table.sql")
drop_view_query = CommonUtilities.get_file_content(SQL_FILES_FOLDER, "02-drop-view.sql")
insert_query = CommonUtilities.get_file_content(
SQL_FILES_FOLDER, "02-insert-into-target.sql"
)
create_table_query = CommonUtilities.get_file_content(
SQL_FILES_FOLDER, "02-create-clone-table.sql"
)
create_view_query = CommonUtilities.get_file_content(
SQL_FILES_FOLDER, "02-create-consolidated-view.sql"
)
drop_table_query = CommonUtilities.get_file_content(
SQL_FILES_FOLDER, "02-drop-table.sql"
)
drop_view_query = CommonUtilities.get_file_content(
SQL_FILES_FOLDER, "02-drop-view.sql"
)

staging_schema = job_arguments.get("staging_schema", target_schema)
staging_table_name = CommonUtilities.get_staging_table_name(target_schema, target_table)
staging_table_name = CommonUtilities.get_staging_table_name(
target_schema, target_table
)

target_table_full_name = f"{target_schema}.{target_table}"

#Drop table if already exists
# Drop table if already exists
drop_staging_table = drop_table_query.format(
target_schema=staging_schema,
target_table=staging_table_name
target_schema=staging_schema, target_table=staging_table_name
)
job_input.execute_query(drop_staging_table)
#create staging table with exact schema of target table
# create staging table with exact schema of target table
create_staging_table = create_table_query.format(
table_schema=staging_schema,
table_name=staging_table_name,
target_schema=target_schema,
target_table=target_table
target_table=target_table,
)
job_input.execute_query(create_staging_table)
#insert all source_view data into staging table
# insert all source_view data into staging table
insert_into_staging = insert_query.format(
target_schema=staging_schema,
target_table=staging_table_name,
source_schema=source_schema,
source_view=source_view
source_view=source_view,
)
job_input.execute_query(insert_into_staging)

view_schema = staging_schema
view_name = f"vw_{staging_table_name}"
# Drop view if already exist
drop_view = drop_view_query.format(
view_schema=view_schema,
view_name=view_name
)
drop_view = drop_view_query.format(view_schema=view_schema, view_name=view_name)
job_input.execute_query(drop_view)

#create consolidated view of staging and target table
# create consolidated view of staging and target table
create_view = create_view_query.format(
view_schema=view_schema,
view_name=view_name,
target_schema=target_schema,
target_table=target_table,
staging_schema=staging_schema,
staging_table=staging_table_name
staging_table=staging_table_name,
)
job_input.execute_query(create_view)

view_full_name = f"{view_schema}.{view_name}"

if check(view_full_name):
job_input.execute_query(f"SELECT * FROM "
f"information_schema.tables WHERE table_schema = '{staging_schema}' "
f"AND table_name = '{staging_table_name}'")
job_input.execute_query(
f"SELECT * FROM "
f"information_schema.tables WHERE table_schema = '{staging_schema}' "
f"AND table_name = '{staging_table_name}'"
)
log.debug("Data View is created successfully.")
else:
raise DataQualityException(
checked_object=view_full_name,
source_view=f"{source_schema}.{source_view}",
target_table=target_table_full_name,
)

Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CREATE TABLE IF NOT EXISTS "{table_schema}"."{table_name}"(

Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-create-clone-table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-create-clone-table.sql#L1

Expected SET ANSI_NULLS ON near top of file

Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-create-clone-table.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-create-clone-table.sql#L1

Expected SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED near top of file
LIKE "{target_schema}"."{target_table}"
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ FROM
UNION ALL
SELECT *
FROM {staging_schema}.{staging_table}
)
)
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DROP View IF EXISTS "{view_schema}"."{view_name}"
DROP View IF EXISTS "{view_schema}"."{view_name}"

Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-view.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-view.sql#L1

Expected SET ANSI_NULLS ON near top of file

Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-view.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-view.sql#L1

Expected SET QUOTED_IDENTIFIER ON near top of file

Check warning on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-view.sql

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-view.sql#L1

Expected TSQL Keyword to be capitalized
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ INSERT INTO {target_schema}.{target_table}
(
SELECT *
FROM {source_schema}.{source_view}
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ def __get_backup_table_name(table_name):


class CommonUtilities:

def __init__(self):
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
def run(job_input: IJobInput) -> None:
# Step 1: create a table that represents the current state

job_input.execute_query(u'''
job_input.execute_query(
"""
DROP TABLE IF EXISTS {target_schema}.{target_table}
''')
"""
)
job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
Expand All @@ -23,7 +25,7 @@ def run(job_input: IJobInput) -> None:
dim_date_id TIMESTAMP,
host_count BIGINT,
cluster_count BIGINT
)
)
"""
)
job_input.execute_query(
Expand All @@ -37,9 +39,11 @@ def run(job_input: IJobInput) -> None:

# Step 2: create a table that represents the next snapshot

job_input.execute_query(u'''
job_input.execute_query(
"""
DROP TABLE IF EXISTS {source_schema}.{source_view}
''')
"""
)
job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS {source_schema}.{source_view} (
Expand All @@ -48,7 +52,7 @@ def run(job_input: IJobInput) -> None:
dim_date_id TIMESTAMP,
host_count BIGINT,
cluster_count BIGINT
)
)
"""
)
job_input.execute_query(
Expand All @@ -62,9 +66,11 @@ def run(job_input: IJobInput) -> None:

# Step 3: Create a table containing the state expected after updating the current state with the next snapshot

job_input.execute_query(u'''
job_input.execute_query(
"""
DROP TABLE IF EXISTS {expect_schema}.{expect_table}
''')
"""
)
job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS {expect_schema}.{expect_table} (
Expand All @@ -73,7 +79,7 @@ def run(job_input: IJobInput) -> None:
dim_date_id TIMESTAMP,
host_count BIGINT,
cluster_count BIGINT
)
)
"""
)
job_input.execute_query(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def run(job_input: IJobInput) -> None:
template_args=template_args,
)


def setup_testing_check(check):
if check == "use_positive_check":
check = _sample_check_true
Expand Down
51 changes: 35 additions & 16 deletions projects/vdk-plugins/vdk-trino/tests/test_vdk_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,12 +600,14 @@ def __scd2_template_check_expected_res(
), f"Elements in {target_table} and {expect_table} differ."

def test_insert(self) -> None:
test_schema = self.__schema
test_schema = self.__schema
source_view = "vw_fact_vmc_utilization_cpu_mem_every5min_daily"
target_table = "dw_fact_vmc_utilization_cpu_mem_every5min_daily"
expect_table = "ex_fact_vmc_utilization_cpu_mem_every5min_daily"

res = self.__fact_insert_template_execute(test_schema, source_view,target_table, expect_table)
res = self.__fact_insert_template_execute(
test_schema, source_view, target_table, expect_table
)

cli_assert(not res.exception, res)

Expand All @@ -621,14 +623,20 @@ def test_insert(self) -> None:
)

def test_insert_checks_positive(self) -> None:
test_schema = self.__schema
test_schema = self.__schema
staging_schema = "staging_vdkprototypes"
source_view = "vw_fact_vmc_utilization_cpu_mem_every5min_daily_check_positive"
target_table = "dw_fact_vmc_utilization_cpu_mem_every5min_daily_check_positive"
expect_table = "ex_fact_vmc_utilization_cpu_mem_every5min_daily_check_positive"

res = self.__fact_insert_template_execute(
test_schema, source_view,target_table, expect_table, "use_positive_check", staging_schema)
test_schema,
source_view,
target_table,
expect_table,
"use_positive_check",
staging_schema,
)

cli_assert(not res.exception, res)
actual_rs = self.__trino_query(f"SELECT * FROM {test_schema}.{target_table}")
Expand All @@ -643,14 +651,20 @@ def test_insert_checks_positive(self) -> None:
)

def test_insert_checks_negative(self) -> None:
test_schema = self.__schema
test_schema = self.__schema
staging_schema = "staging_vdkprototypes"
source_view = "vw_fact_vmc_utilization_cpu_mem_every5min_daily_check_negative"
target_table = "dw_fact_vmc_utilization_cpu_mem_every5min_daily_check_negative"
expect_table = "ex_fact_vmc_utilization_cpu_mem_every5min_daily_check_negative"

res = self.__fact_insert_template_execute(
test_schema, source_view,target_table, expect_table, "use_negative_check", staging_schema)
test_schema,
source_view,
target_table,
expect_table,
"use_negative_check",
staging_schema,
)

assert res.exception
actual_rs = self.__trino_query(f"SELECT * FROM {test_schema}.{target_table}")
Expand All @@ -659,14 +673,20 @@ def test_insert_checks_negative(self) -> None:
assert actual_rs.output != expected_rs.output

def test_insert_clean_staging(self) -> None:
test_schema = self.__schema
test_schema = self.__schema
staging_schema = "staging_vdkprototypes"
source_view = "vw_fact_vmc_utilization_cpu_mem_every5min_daily_clean_staging"
target_table = "dw_fact_vmc_utilization_cpu_mem_every5min_daily_clean_staging"
expect_table = "ex_fact_vmc_utilization_cpu_mem_every5min_daily_clean_staging"

res_first_exec = self.__fact_insert_template_execute(
test_schema, source_view,target_table, expect_table, "use_positive_check", staging_schema)
test_schema,
source_view,
target_table,
expect_table,
"use_positive_check",
staging_schema,
)

staging_table_name = f"vdk_check_{test_schema}_{target_table}"
first_exec_rs = self.__trino_query(
Expand Down Expand Up @@ -702,13 +722,13 @@ def test_insert_clean_staging(self) -> None:
)

def __fact_insert_template_execute(
self,
test_schema,
source_view,
target_table,
expect_table,
check=False,
staging_schema=None,
self,
test_schema,
source_view,
target_table,
expect_table,
check=False,
staging_schema=None,
):
if check != False and staging_schema is not None:
return self.__runner.invoke(
Expand Down Expand Up @@ -772,4 +792,3 @@ def __trino_query(self, query: str) -> Result:
query,
]
)

0 comments on commit 7e74f0a

Please sign in to comment.